ea9364153cb12bc1d12c0fdadfc522fdff81dace
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         1
70 #define STREAMING_INFORMHOME                    0
71 CpvDeclare(int, _crashedNode);
72
73 // static, so that it is accessible from Converse part
74 int CkMemCheckPT::inRestarting = 0;
75 int CkMemCheckPT::inLoadbalancing = 0;
76 double CkMemCheckPT::startTime;
77 char *CkMemCheckPT::stage;
78 CkCallback CkMemCheckPT::cpCallback;
79
80 int _memChkptOn = 1;                    // checkpoint is on or off
81
82 CkGroupID ckCheckPTGroupID;             // readonly
83
84 static int checkpointed = 0;
85
86 /// @todo the following declarations should be moved into a separate file for all 
87 // fault tolerant strategies
88
89 #ifdef CMK_MEM_CHECKPOINT
90 // name of the kill file that contains processes to be killed 
91 char *killFile;                                               
92 // flag for the kill file         
93 int killFlag=0;
94 // variable for storing the killing time
95 double killTime=0.0;
96 #endif
97
98 #ifdef CKLOCMGR_LOOP
99 #undef CKLOCMGR_LOOP
100 #endif
101 // loop over all CkLocMgr and do "code"
102 #define  CKLOCMGR_LOOP(code)    {       \
103   int numGroups = CkpvAccess(_groupIDTable)->size();    \
104   for(int i=0;i<numGroups;i++) {        \
105     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
106     if(obj->isLocMgr())  {      \
107       CkLocMgr *mgr = (CkLocMgr*)obj;   \
108       code      \
109     }   \
110   }     \
111  }
112
113 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
114 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
115
116 // compute the backup processor
117 // FIXME: avoid crashed processors
118 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
119
120 inline int CkMemCheckPT::BuddyPE(int pe)
121 {
122   int budpe;
123 #if NODE_CHECKPOINT
124     // buddy is the processor with same rank on the next physical node
125   int r1 = CmiPhysicalRank(pe);
126   int budnode = CmiPhysicalNodeID(pe);
127   do {
128     budnode = (budnode+1)%CmiNumPhysicalNodes();
129     int *pelist;
130     int num;
131     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
132     budpe = pelist[r1 % num];
133   } while (isFailed(budpe));
134   if (budpe == pe) {
135     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
136     CmiAbort("Failed to find a buddy processor");
137   }
138 #else
139   budpe = pe;
140   while (budpe == pe || isFailed(budpe)) 
141           budpe = (budpe+1)%CkNumPes();
142 #endif
143   return budpe;
144 }
145
146 // called in array element constructor
147 // choose and register with 2 buddies for checkpoiting 
148 #if CMK_MEM_CHECKPOINT
149 void ArrayElement::init_checkpt() {
150         if (_memChkptOn == 0) return;
151         if (CkInRestarting()) {
152           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
153         }
154         // only master init checkpoint
155         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
156
157         budPEs[0] = CkMyPe();
158         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
159         CmiAssert(budPEs[0] != budPEs[1]);
160         // inform checkPTMgr
161         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
162         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
163         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
164         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
165 }
166 #endif
167
168 // entry function invoked by checkpoint mgr asking for checkpoint data
169 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
170 #if CMK_MEM_CHECKPOINT
171 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
172 //char index[128];   thisIndexMax.sprint(index);
173 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
174   CkLocMgr *locMgr = thisArray->getLocMgr();
175   CmiAssert(myRec!=NULL);
176   int size;
177   {
178         PUP::sizer p;
179         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
180         size = p.size();
181   }
182   int packSize = size/sizeof(double) +1;
183   CkArrayCheckPTMessage *msg =
184                  new (packSize, 0) CkArrayCheckPTMessage;
185   msg->len = size;
186   msg->index =thisIndexMax;
187   msg->aid = thisArrayID;
188   msg->locMgr = locMgr->getGroupID();
189   msg->cp_flag = 1;
190   {
191         PUP::toMem p(msg->packData);
192         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
193   }
194
195   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
196   checkptMgr.recvData(msg, 2, budPEs);
197   delete m;
198 #endif
199 }
200
201 // checkpoint holder class - for memory checkpointing
202 class CkMemCheckPTInfo: public CkCheckPTInfo
203 {
204   CkArrayCheckPTMessage *ckBuffer;
205 public:
206   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
207                     CkCheckPTInfo(a, loc, idx, pno)
208   {
209     ckBuffer = NULL;
210   }
211   ~CkMemCheckPTInfo() 
212   {
213     if (ckBuffer) delete ckBuffer; 
214   }
215   inline void updateBuffer(CkArrayCheckPTMessage *data) 
216   {
217     CmiAssert(data!=NULL);
218     if (ckBuffer) delete ckBuffer;
219     ckBuffer = data;
220   }    
221   inline CkArrayCheckPTMessage * getCopy()
222   {
223     if (ckBuffer == NULL) {
224       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
225       CmiAbort("Abort!");
226     }
227     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
228   }     
229   inline void updateBuddy(int b1, int b2) {
230      CmiAssert(ckBuffer);
231      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
232      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
233      CmiAssert(pNo != CkMyPe());
234   }
235   inline int getSize() { 
236      CmiAssert(ckBuffer);
237      return ckBuffer->len; 
238   }
239 };
240
241 // checkpoint holder class - for in-disk checkpointing
242 class CkDiskCheckPTInfo: public CkCheckPTInfo 
243 {
244   char *fname;
245   int bud1, bud2;
246   int len;                      // checkpoint size
247 public:
248   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
249   {
250 #if CMK_USE_MKSTEMP
251     fname = new char[64];
252     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
253     mkstemp(fname);
254 #else
255     fname=tmpnam(NULL);
256 #endif
257     bud1 = bud2 = -1;
258     len = 0;
259   }
260   ~CkDiskCheckPTInfo() 
261   {
262     remove(fname);
263   }
264   inline void updateBuffer(CkArrayCheckPTMessage *data) 
265   {
266     double t = CmiWallTimer();
267     // unpack it
268     envelope *env = UsrToEnv(data);
269     CkUnpackMessage(&env);
270     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
271     FILE *f = fopen(fname,"wb");
272     PUP::toDisk p(f);
273     CkPupMessage(p, (void **)&data);
274     // delay sync to the end because otherwise the messages are blocked
275 //    fsync(fileno(f));
276     fclose(f);
277     bud1 = data->bud1;
278     bud2 = data->bud2;
279     len = data->len;
280     delete data;
281     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
282   }
283   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
284   {
285     CkArrayCheckPTMessage *data;
286     FILE *f = fopen(fname,"rb");
287     PUP::fromDisk p(f);
288     CkPupMessage(p, (void **)&data);
289     fclose(f);
290     data->bud1 = bud1;                          // update the buddies
291     data->bud2 = bud2;
292     return data;
293   }
294   inline void updateBuddy(int b1, int b2) {
295      bud1 = b1; bud2 = b2;
296      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
297      CmiAssert(pNo != CkMyPe());
298   }
299   inline int getSize() { 
300      return len; 
301   }
302 };
303
304 CkMemCheckPT::CkMemCheckPT(int w)
305 {
306   int numnodes = 0;
307 #if NODE_CHECKPOINT
308   numnodes = CmiNumPhysicalNodes();
309 #else
310   numnodes = CkNumPes();
311 #endif
312 #if CK_NO_PROC_POOL
313   if (numnodes <= 2)
314 #else
315   if (numnodes  == 1)
316 #endif
317   {
318     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
319     _memChkptOn = 0;
320   }
321   inRestarting = 0;
322   recvCount = peCount = 0;
323   ackCount = 0;
324   expectCount = -1;
325   where = w;
326
327 #if CMK_CONVERSE_MPI
328   void pingBuddy();
329   void pingCheckHandler();
330   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
331   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
332 #endif
333 }
334
335 CkMemCheckPT::~CkMemCheckPT()
336 {
337   int len = ckTable.length();
338   for (int i=0; i<len; i++) {
339     delete ckTable[i];
340   }
341 }
342
343 void CkMemCheckPT::pup(PUP::er& p) 
344
345   CBase_CkMemCheckPT::pup(p); 
346   p|cpStarter;
347   p|thisFailedPe;
348   p|failedPes;
349   p|ckCheckPTGroupID;           // recover global variable
350   p|cpCallback;                 // store callback
351   p|where;                      // where to checkpoint
352   p|peCount;
353   if (p.isUnpacking()) {
354     recvCount = 0;
355 #if CMK_CONVERSE_MPI
356     void pingBuddy();
357     void pingCheckHandler();
358     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
359     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
360 #endif
361   }
362 }
363
364 // called by checkpoint mgr to restore an array element
365 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
366 {
367 #if CMK_MEM_CHECKPOINT
368   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
369   // m->index.print();
370   PUP::fromMem p(m->packData);
371   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
372   CmiAssert(mgr);
373 #if STREAMING_INFORMHOME
374   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
375 #else
376   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
377 #endif
378
379   // find a list of array elements bound together
380   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
381   CmiAssert(elt);
382   CkLocRec_local *rec = elt->myRec;
383   CkVec<CkMigratable *> list;
384   mgr->migratableList(rec, list);
385   CmiAssert(list.length() > 0);
386   for (int l=0; l<list.length(); l++) {
387     elt = (ArrayElement *)list[l];
388     elt->budPEs[0] = m->bud1;
389     elt->budPEs[1] = m->bud2;
390     //    reset, may not needed now
391     // for now.
392     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
393       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
394       if (c) c->redNo = 0;
395     }
396   }
397 #endif
398 }
399
400 // return 1 if pe is a crashed processor
401 int CkMemCheckPT::isFailed(int pe)
402 {
403   for (int i=0; i<failedPes.length(); i++)
404     if (failedPes[i] == pe) return 1;
405   return 0;
406 }
407
408 // add pe into history list of all failed processors
409 void CkMemCheckPT::failed(int pe)
410 {
411   if (isFailed(pe)) return;
412   failedPes.push_back(pe);
413 }
414
415 int CkMemCheckPT::totalFailed()
416 {
417   return failedPes.length();
418 }
419
420 // create an checkpoint entry for array element of aid with index.
421 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
422 {
423   // error check, no duplicate
424   int idx, len = ckTable.size();
425   for (idx=0; idx<len; idx++) {
426     CkCheckPTInfo *entry = ckTable[idx];
427     if (index == entry->index) {
428       if (loc == entry->locMgr) {
429           // bindTo array elements
430           return;
431       }
432         // for array inheritance, the following check may fail
433         // because ArrayElement constructor of all superclasses are called
434       if (aid == entry->aid) {
435         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
436         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
437       }
438     }
439   }
440   CkCheckPTInfo *newEntry;
441   if (where == CkCheckPoint_inMEM)
442     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
443   else
444     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
445   ckTable.push_back(newEntry);
446   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
447 }
448
449 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
450 {
451 #if !CMK_CHKP_ALL       
452   int buddy = msg->bud1;
453   if (buddy == CkMyPe()) buddy = msg->bud2;
454   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
455   recvData(msg);
456     // ack
457   thisProxy[buddy].gotData();
458 #else
459   recvArrayCheckpoint(msg);
460   thisProxy[msg->bud2].gotData();
461 #endif
462 }
463
464 // loop through my checkpoint table and ask checkpointed array elements
465 // to send me checkpoint data.
466 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
467 {
468   checkpointed = 1;
469   cpCallback = cb;
470   cpStarter = starter;
471   if (CkMyPe() == cpStarter) {
472     startTime = CmiWallTimer();
473     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
474   }
475 #if !CMK_CHKP_ALL
476   int len = ckTable.length();
477   for (int i=0; i<len; i++) {
478     CkCheckPTInfo *entry = ckTable[i];
479       // always let the bigger number processor send request
480     //if (CkMyPe() < entry->pNo) continue;
481       // always let the smaller number processor send request, may on same proc
482     if (!isMaster(entry->pNo)) continue;
483       // call inmem_checkpoint to the array element, ask it to send
484       // back checkpoint data via recvData().
485     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
486     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
487   }
488     // if my table is empty, then I am done
489   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
490 #else
491   startArrayCheckpoint();
492 #endif
493   // pack and send proc level data
494   sendProcData();
495 }
496
497 class MemElementPacker : public CkLocIterator{
498         private:
499                 CkLocMgr *locMgr;
500                 PUP::er &p;
501         public:
502                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
503                 void addLocation(CkLocation &loc){
504                         CkArrayIndexMax idx = loc.getIndex();
505                         CkGroupID gID = locMgr->ckGetGroupID();
506                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
507                         CmiAssert(elt);
508                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
509                         p|gID;
510                         p|idx;
511                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
512                 }
513 };
514
515 void CkMemCheckPT::pupAllElements(PUP::er &p){
516 #if CMK_CHKP_ALL
517         int numElements;
518         if(!p.isUnpacking()){
519                 numElements = CkCountArrayElements();
520         }
521         p | numElements;
522         if(!p.isUnpacking()){
523                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
524         }
525 #endif
526 }
527
528 void CkMemCheckPT::startArrayCheckpoint(){
529 #if CMK_CHKP_ALL
530         int size;
531         {
532                 PUP::sizer psizer;
533                 pupAllElements(psizer);
534                 size = psizer.size();
535         }
536         int packSize = size/sizeof(double)+1;
537  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
538         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
539         msg->len = size;
540         msg->cp_flag = 1;
541         int budPEs[2];
542         msg->bud1=CkMyPe();
543         msg->bud2=ChkptOnPe(CkMyPe());
544         {
545                 PUP::toMem p(msg->packData);
546                 pupAllElements(p);
547         }
548         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
549         chkpTable[0] = msg;
550         recvCount++;
551 #endif
552 }
553
554 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
555 {
556 #if CMK_CHKP_ALL
557         int idx = 1;
558         if(msg->bud1 == CkMyPe()){
559                 idx = 0;
560         }
561         int isChkpting = msg->cp_flag;
562         chkpTable[idx] = msg;
563         if(isChkpting){
564                 recvCount++;
565                 if(recvCount == 2){
566                   if (where == CkCheckPoint_inMEM) {
567                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
568                   }
569                   else if (where == CkCheckPoint_inDISK) {
570                         // another barrier for finalize the writing using fsync
571                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
572                         contribute(0,NULL,CkReduction::sum_int,localcb);
573                   }
574                   else
575                         CmiAbort("Unknown checkpoint scheme");
576                   recvCount = 0;
577                 }
578         }
579 #endif
580 }
581
582 // don't handle array elements
583 static inline void _handleProcData(PUP::er &p)
584 {
585     // save readonlys, and callback BTW
586     CkPupROData(p);
587
588     // save mainchares 
589     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
590         
591 #ifndef CMK_CHARE_USE_PTR
592     // save non-migratable chare
593     CkPupChareData(p);
594 #endif
595
596     // save groups into Groups.dat
597     CkPupGroupData(p);
598
599     // save nodegroups into NodeGroups.dat
600     if(CkMyRank()==0) CkPupNodeGroupData(p);
601 }
602
603 void CkMemCheckPT::sendProcData()
604 {
605   // find out size of buffer
606   int size;
607   {
608     PUP::sizer p;
609     _handleProcData(p);
610     size = p.size();
611   }
612   int packSize = size;
613   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
614   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
615   {
616     PUP::toMem p(msg->packData);
617     _handleProcData(p);
618   }
619   msg->pe = CkMyPe();
620   msg->len = size;
621   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
622   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
623 }
624
625 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
626 {
627   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
628   CpvAccess(procChkptBuf) = msg;
629   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
630   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
631 }
632
633 // ArrayElement call this function to give us the checkpointed data
634 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
635 {
636   int len = ckTable.length();
637   int idx;
638   for (idx=0; idx<len; idx++) {
639     CkCheckPTInfo *entry = ckTable[idx];
640     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
641   }
642   CkAssert(idx < len);
643   int isChkpting = msg->cp_flag;
644   ckTable[idx]->updateBuffer(msg);
645   if (isChkpting) {
646       // all my array elements have returned their inmem data
647       // inform starter processor that I am done.
648     recvCount ++;
649     if (recvCount == ckTable.length()) {
650       if (where == CkCheckPoint_inMEM) {
651         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
652       }
653       else if (where == CkCheckPoint_inDISK) {
654         // another barrier for finalize the writing using fsync
655         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
656         contribute(0,NULL,CkReduction::sum_int,localcb);
657       }
658       else
659         CmiAbort("Unknown checkpoint scheme");
660       recvCount = 0;
661     } 
662   }
663 }
664
665 // only used in disk checkpointing
666 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
667 {
668   delete m;
669 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
670   system("sync");
671 #endif
672   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
673 }
674
675 // only is called on cpStarter when checkpoint is done
676 void CkMemCheckPT::cpFinish()
677 {
678   CmiAssert(CkMyPe() == cpStarter);
679   peCount++;
680     // now that all processors have finished, activate callback
681   if (peCount == 2) {
682     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
683     cpCallback.send();
684     peCount = 0;
685     thisProxy.report();
686   }
687 }
688
689 // for debugging, report checkpoint info
690 void CkMemCheckPT::report()
691 {
692 #if !CMK_CHKP_ALL
693   int objsize = 0;
694   int len = ckTable.length();
695   for (int i=0; i<len; i++) {
696     CkCheckPTInfo *entry = ckTable[i];
697     CmiAssert(entry);
698     objsize += entry->getSize();
699   }
700   CmiAssert(CpvAccess(procChkptBuf));
701   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
702 #else
703   if(CkMyPe()==0)
704   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
705 #endif
706 }
707
708 /*****************************************************************************
709                         RESTART Procedure
710 *****************************************************************************/
711
712 // master processor of two buddies
713 inline int CkMemCheckPT::isMaster(int buddype)
714 {
715 #if 0
716   int mype = CkMyPe();
717 //CkPrintf("ismaster: %d %d\n", pe, mype);
718   if (CkNumPes() - totalFailed() == 2) {
719     return mype > buddype;
720   }
721   for (int i=1; i<CkNumPes(); i++) {
722     int me = (buddype+i)%CkNumPes();
723     if (isFailed(me)) continue;
724     if (me == mype) return 1;
725     else return 0;
726   }
727   return 0;
728 #else
729     // smaller one
730   int mype = CkMyPe();
731 //CkPrintf("ismaster: %d %d\n", pe, mype);
732   if (CkNumPes() - totalFailed() == 2) {
733     return mype < buddype;
734   }
735 #if NODE_CHECKPOINT
736   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
737   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
738 #else
739   for (int i=1; i<CkNumPes(); i++) {
740 #endif
741     int me = (mype+i)%CkNumPes();
742     if (isFailed(me)) continue;
743     if (me == buddype) return 1;
744     else return 0;
745   }
746   return 0;
747 #endif
748 }
749
750
751
752 #if 0
753 // helper class to pup all elements that belong to same ckLocMgr
754 class ElementDestoryer : public CkLocIterator {
755 private:
756         CkLocMgr *locMgr;
757 public:
758         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
759         void addLocation(CkLocation &loc) {
760                 CkArrayIndex idx=loc.getIndex();
761                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
762                 loc.destroy();
763         }
764 };
765 #endif
766
767 // restore the bitmap vector for LB
768 void CkMemCheckPT::resetLB(int diepe)
769 {
770 #if CMK_LBDB_ON
771   int i;
772   char *bitmap = new char[CkNumPes()];
773   // set processor available bitmap
774   get_avail_vector(bitmap);
775
776   for (i=0; i<failedPes.length(); i++)
777     bitmap[failedPes[i]] = 0; 
778   bitmap[diepe] = 0;
779
780 #if CK_NO_PROC_POOL
781   set_avail_vector(bitmap);
782 #endif
783
784   // if I am the crashed pe, rebuild my failedPEs array
785   if (CkMyNode() == diepe)
786     for (i=0; i<CkNumPes(); i++) 
787       if (bitmap[i]==0) failed(i);
788
789   delete [] bitmap;
790 #endif
791 }
792
793 // in case when failedPe dies, everybody go through its checkpoint table:
794 // destory all array elements
795 // recover lost buddies
796 // reconstruct all array elements from check point data
797 // called on all processors
798 void CkMemCheckPT::restart(int diePe)
799 {
800 #if CMK_MEM_CHECKPOINT
801   double curTime = CmiWallTimer();
802   if (CkMyPe() == diePe)
803     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
804   stage = (char*)"resetLB";
805   startTime = curTime;
806   if (CkMyPe() == diePe)
807     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
808
809 #if CK_NO_PROC_POOL
810   failed(diePe);        // add into the list of failed pes
811 #endif
812   thisFailedPe = diePe;
813
814   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
815
816   inRestarting = 1;
817                                                                                 
818   // disable load balancer's barrier
819   if (CkMyPe() != diePe) resetLB(diePe);
820
821   CKLOCMGR_LOOP(mgr->startInserting(););
822
823   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
824   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
825 /*
826   if (CkMyPe() == 0)
827     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
828 */
829 #endif
830 }
831
832 // loally remove all array elements
833 void CkMemCheckPT::removeArrayElements()
834 {
835 #if CMK_MEM_CHECKPOINT
836   int len = ckTable.length();
837   double curTime = CmiWallTimer();
838   if (CkMyPe() == thisFailedPe) 
839     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
840   stage = (char*)"removeArrayElements";
841   startTime = curTime;
842
843   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
844   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
845
846   // get rid of all buffering and remote recs
847   // including destorying all array elements
848   CKLOCMGR_LOOP(mgr->flushAllRecs(););
849
850 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
851
852   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
853   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
854 #endif
855 }
856
857 // flush state in reduction manager
858 void CkMemCheckPT::resetReductionMgr()
859 {
860   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
861   int numGroups = CkpvAccess(_groupIDTable)->size();
862   for(int i=0;i<numGroups;i++) {
863     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
864     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
865     obj->flushStates();
866     obj->ckJustMigrated();
867   }
868   // reset again
869   //CpvAccess(_qd)->flushStates();
870
871 #if 1
872   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
873   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
874 #else
875   if (CkMyPe() == 0)
876     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
877 #endif
878 }
879
880 // recover the lost buddies
881 void CkMemCheckPT::recoverBuddies()
882 {
883   int idx;
884   int len = ckTable.length();
885   // ready to flush reduction manager
886   // cannot be CkMemCheckPT::restart because destory will modify states
887   double curTime = CmiWallTimer();
888   if (CkMyPe() == thisFailedPe)
889   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
890   stage = (char *)"recoverBuddies";
891   if (CkMyPe() == thisFailedPe)
892   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
893   startTime = curTime;
894
895   // recover buddies
896   expectCount = 0;
897 #if !CMK_CHKP_ALL
898   for (idx=0; idx<len; idx++) {
899     CkCheckPTInfo *entry = ckTable[idx];
900     if (entry->pNo == thisFailedPe) {
901 #if CK_NO_PROC_POOL
902       // find a new buddy
903       int budPe = BuddyPE(CkMyPe());
904 #else
905       int budPe = thisFailedPe;
906 #endif
907       CkArrayCheckPTMessage *msg = entry->getCopy();
908       msg->bud1 = budPe;
909       msg->bud2 = CkMyPe();
910       msg->cp_flag = 0;            // not checkpointing
911       thisProxy[budPe].recoverEntry(msg);
912       expectCount ++;
913     }
914   }
915 #else
916   //send to failed pe
917   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
918 #if CK_NO_PROC_POOL
919       // find a new buddy
920       int budPe = BuddyPE(CkMyPe());
921 #else
922       int budPe = thisFailedPe;
923 #endif
924       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
925       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
926           msg->cp_flag = 0;            // not checkpointing
927       msg->bud1 = budPe;
928       msg->bud2 = CkMyPe();
929       thisProxy[budPe].recoverEntry(msg);
930       expectCount ++;
931   }
932 #endif
933
934 #if 1
935   if (expectCount == 0) {
936     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
937     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
938   }
939 #else
940   if (CkMyPe() == 0) {
941     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
942   }
943 #endif
944
945   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
946 }
947
948 void CkMemCheckPT::gotData()
949 {
950   ackCount ++;
951   if (ackCount == expectCount) {
952     ackCount = 0;
953     expectCount = -1;
954     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
955     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
956   }
957 }
958
959 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
960 {
961
962   for (int i=0; i<n; i++) {
963     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
964     mgr->updateLocation(idx[i], nowOnPe);
965   }
966         thisProxy[nowOnPe].gotReply();
967 }
968
969 // restore array elements
970 void CkMemCheckPT::recoverArrayElements()
971 {
972   double curTime = CmiWallTimer();
973   int len = ckTable.length();
974   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
975   stage = (char *)"recoverArrayElements";
976   if (CkMyPe() == thisFailedPe)
977   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
978   startTime = curTime;
979  int flag = 0;
980   // recover all array elements
981   int count = 0;
982 //#if STREAMING_INFORMHOME
983   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
984   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
985 //#endif
986 #if !CMK_CHKP_ALL
987   for (int idx=0; idx<len; idx++)
988   {
989     CkCheckPTInfo *entry = ckTable[idx];
990 #if CK_NO_PROC_POOL
991     // the bigger one will do 
992 //    if (CkMyPe() < entry->pNo) continue;
993     if (!isMaster(entry->pNo)) continue;
994 #else
995     // smaller one do it, which has the original object
996     if (CkMyPe() == entry->pNo+1 || 
997         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
998 #endif
999 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1000
1001     entry->updateBuddy(CkMyPe(), entry->pNo);
1002     CkArrayCheckPTMessage *msg = entry->getCopy();
1003     // gzheng
1004     //thisProxy[CkMyPe()].inmem_restore(msg);
1005     inmem_restore(msg);
1006 #if STREAMING_INFORMHOME
1007     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1008     int homePe = mgr->homePe(msg->index);
1009     if (homePe != CkMyPe()) {
1010       gmap[homePe].push_back(msg->locMgr);
1011       imap[homePe].push_back(msg->index);
1012     }
1013 #endif
1014     CkFreeMsg(msg);
1015     count ++;
1016   }
1017 #else
1018         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1019         recoverAll(msg,gmap,imap);
1020     CkFreeMsg(msg);
1021 #endif
1022   curTime = CmiWallTimer();
1023   if (CkMyPe() == thisFailedPe)
1024         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1025 #if STREAMING_INFORMHOME
1026   for (int i=0; i<CkNumPes(); i++) {
1027     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1028       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1029         flag++; 
1030           }
1031   }
1032 #endif
1033   delete [] imap;
1034   delete [] gmap;
1035   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1036
1037   CKLOCMGR_LOOP(mgr->doneInserting(););
1038
1039   // _crashedNode = -1;
1040   CpvAccess(_crashedNode) = -1;
1041 if(CkMyPe()!=thisFailedPe)
1042   inRestarting = 0;
1043  // if (CkMyPe() == 0)
1044    // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1045 if(flag == 0)
1046 {
1047     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1048 }
1049 }
1050
1051 void CkMemCheckPT::gotReply(){
1052     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1053 }
1054
1055 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1056 #if CMK_CHKP_ALL
1057         PUP::fromMem p(msg->packData);
1058         int numElements;
1059         p|numElements;
1060         if(p.isUnpacking()){
1061                 for(int i=0;i<numElements;i++){
1062                         CkGroupID gID;
1063                         CkArrayIndex idx;
1064                         p|gID;
1065                         p|idx;
1066                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1067                         int homePe = mgr->homePe(idx);
1068 #if STREAMING_INFORMHOME
1069                         mgr->resume(idx,p,CmiFalse);
1070 #else
1071                                 mgr->resume(idx,p,CmiFalse);
1072 #endif
1073                           /*CkLocRec_local *rec = loc.getLocalRecord();
1074                           CmiAssert(rec);
1075                           CkVec<CkMigratable *> list;
1076                           mgr->migratableList(rec, list);
1077                           CmiAssert(list.length() > 0);
1078                           for (int l=0; l<list.length(); l++) {
1079                                 ArrayElement * elt = (ArrayElement *)list[l];
1080                                 //    reset, may not needed now
1081                                 // for now.
1082                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1083                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1084                                   if (c) c->redNo = 0;
1085                                 }
1086                           }*/
1087 #if STREAMING_INFORMHOME
1088                         homePe = mgr->homePe(idx);
1089                         if (homePe != CkMyPe()) {
1090                           gmap[homePe].push_back(gID);
1091                           imap[homePe].push_back(idx);
1092                         }
1093 #endif
1094                 }
1095         }
1096 #endif
1097 }
1098
1099 static double restartT;
1100
1101 // on every processor
1102 // turn load balancer back on
1103 void CkMemCheckPT::finishUp()
1104 {
1105   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1106   //CKLOCMGR_LOOP(mgr->doneInserting(););
1107   
1108   if (CkMyPe() == thisFailedPe)
1109   {
1110         inRestarting=0;
1111        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1112        //CkStartQD(cpCallback);
1113        cpCallback.send();
1114        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1115   }
1116
1117 #if CK_NO_PROC_POOL
1118 #if NODE_CHECKPOINT
1119   int numnodes = CmiNumPhysicalNodes();
1120 #else
1121   int numnodes = CkNumPes();
1122 #endif
1123   if (numnodes-totalFailed() <=2) {
1124     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1125     _memChkptOn = 0;
1126   }
1127 #endif
1128 }
1129
1130 // called only on 0
1131 void CkMemCheckPT::quiescence(CkCallback &cb)
1132 {
1133   static int pe_count = 0;
1134   pe_count ++;
1135   CmiAssert(CkMyPe() == 0);
1136   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1137   if (pe_count == CkNumPes()) {
1138     pe_count = 0;
1139     cb.send();
1140   }
1141 }
1142
1143 // User callable function - to start a checkpoint
1144 // callback cb is used to pass control back
1145 void CkStartMemCheckpoint(CkCallback &cb)
1146 {
1147 #if CMK_MEM_CHECKPOINT
1148   if (_memChkptOn == 0) {
1149     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1150     cb.send();
1151     return;
1152   }
1153   if (CkInRestarting()) {
1154       // trying to checkpointing during restart
1155     cb.send();
1156     return;
1157   }
1158     // store user callback and user data
1159   CkMemCheckPT::cpCallback = cb;
1160
1161     // broadcast to start check pointing
1162   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1163   checkptMgr.doItNow(CkMyPe(), cb);
1164 #else
1165   // when mem checkpoint is disabled, invike cb immediately
1166   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1167   cb.send();
1168 #endif
1169 }
1170
1171 void CkRestartCheckPoint(int diePe)
1172 {
1173 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1174   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1175   // broadcast
1176   checkptMgr.restart(diePe);
1177 }
1178
1179 static int _diePE = -1;
1180
1181 // callback function used locally by ccs handler
1182 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1183 {
1184 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1185   CkRestartCheckPoint(_diePE);
1186 }
1187
1188 // Converse function handles
1189 static int askPhaseHandlerIdx;
1190 static int recvPhaseHandlerIdx;
1191 static int askProcDataHandlerIdx;
1192 static int restartBcastHandlerIdx;
1193 static int recoverProcDataHandlerIdx;
1194 static int restartBeginHandlerIdx;
1195 static int notifyHandlerIdx;
1196
1197 // called on crashed PE
1198 static void restartBeginHandler(char *msg)
1199 {
1200   CmiFree(msg);
1201 #if CMK_MEM_CHECKPOINT
1202 #if CMK_USE_BARRIER
1203         if(CkMyPe()!=_diePE){
1204                 printf("restar begin on %d\n",CkMyPe());
1205                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1206                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1207                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1208         }else{
1209         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1210         CkRestartCheckPointCallback(NULL, NULL);
1211         }
1212 #else
1213   static int count = 0;
1214   CmiAssert(CkMyPe() == _diePE);
1215   count ++;
1216   if (count == CkNumPes()) {
1217     CkRestartCheckPointCallback(NULL, NULL);
1218     count = 0;
1219   }
1220 #endif
1221 #endif
1222 }
1223
1224 extern void _discard_charm_message();
1225 extern void _resume_charm_message();
1226
1227 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1228         return data;
1229 }
1230
1231 static void restartBcastHandler(char *msg)
1232 {
1233 #if CMK_MEM_CHECKPOINT
1234   // advance phase counter
1235   CkMemCheckPT::inRestarting = 1;
1236   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1237   // gzheng
1238   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1239
1240   if (CkMyPe()==_diePE)
1241     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1242
1243   // reset QD counters
1244 /*  gzheng
1245   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1246 */
1247
1248 /*  gzheng
1249   if (CkMyPe()==_diePE)
1250       CkRestartCheckPointCallback(NULL, NULL);
1251 */
1252   CmiFree(msg);
1253
1254   _resume_charm_message();
1255
1256     // reduction
1257   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1258   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1259 #if CMK_USE_BARRIER
1260         //CmiPrintf("before reduce\n"); 
1261         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1262         //CmiPrintf("after reduce\n");  
1263 #else
1264   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1265 #endif 
1266  checkpointed = 0;
1267 #endif
1268 }
1269
1270 extern void _initDone();
1271 #include "hwi/include/bqc/A2_inlines.h"
1272
1273 // called on crashed processor
1274 static void recoverProcDataHandler(char *msg)
1275 {
1276 #if CMK_MEM_CHECKPOINT
1277    int i;
1278    envelope *env = (envelope *)msg;
1279    CkUnpackMessage(&env);
1280    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1281    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1282    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1283    //cur_restart_phase ++;
1284      // gzheng ?
1285    //CpvAccess(_qd)->flushStates();
1286
1287    // restore readonly, mainchare, group, nodegroup
1288 //   int temp = cur_restart_phase;
1289 //   cur_restart_phase = -1;
1290    PUP::fromMem p(procMsg->packData);
1291    _handleProcData(p);
1292
1293    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1294    // gzheng
1295    CKLOCMGR_LOOP(mgr->startInserting(););
1296
1297    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1298    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1299    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1300    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1301
1302    _initDone();
1303 //   CpvAccess(_qd)->flushStates();
1304    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1305 #endif
1306 }
1307
1308 // called on its backup processor
1309 // get backup message buffer and sent to crashed processor
1310 static void askProcDataHandler(char *msg)
1311 {
1312 #if CMK_MEM_CHECKPOINT
1313     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1314     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1315     if (CpvAccess(procChkptBuf) == NULL)  {
1316       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1317       CkAbort("no checkpoint found");
1318     }
1319     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1320     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1321
1322     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1323
1324     CkPackMessage(&env);
1325     CmiSetHandler(env, recoverProcDataHandlerIdx);
1326     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1327     CpvAccess(procChkptBuf) = NULL;
1328     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1329 #endif
1330 }
1331
1332 // called on PE 0
1333 void qd_callback(void *m)
1334 {
1335    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1336    CkFreeMsg(m);
1337 #ifdef CMK_SMP
1338    for(int i=0;i<CmiMyNodeSize();i++){
1339    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1340    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1341         CmiSetHandler(msg, askProcDataHandlerIdx);
1342         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1343         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1344    }
1345    return;
1346 #endif
1347    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1348    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1349    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1350    CmiSetHandler(msg, askProcDataHandlerIdx);
1351    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1352    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1353
1354 }
1355
1356 // on crashed node
1357 void CkMemRestart(const char *dummy, CkArgMsg *args)
1358 {
1359 #if CMK_MEM_CHECKPOINT
1360    _diePE = CmiMyNode();
1361    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1362    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1363    CkMemCheckPT::inRestarting = 1;
1364
1365   CpvAccess( _crashedNode )= CmiMyNode();
1366         
1367   _discard_charm_message();
1368     restartT = CmiWallTimer();
1369    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1370   
1371   /*if(CmiMyRank()==0){
1372     CkCallback cb(qd_callback);
1373     CkStartQD(cb);
1374     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1375   }*/
1376    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1377    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1378    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1379    CmiSetHandler(msg, askProcDataHandlerIdx);
1380    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1381    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1382 #else
1383    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1384 #endif
1385 }
1386
1387 // can be called in other files
1388 // return true if it is in restarting
1389 extern "C"
1390 int CkInRestarting()
1391 {
1392 #if CMK_MEM_CHECKPOINT
1393   if (CpvAccess( _crashedNode)!=-1) return 1;
1394   // gzheng
1395   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1396   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1397   return CkMemCheckPT::inRestarting;
1398 #else
1399   return 0;
1400 #endif
1401 }
1402
1403 extern "C"
1404 void CkSetInLdb(){
1405 #if CMK_MEM_CHECKPOINT
1406         CkMemCheckPT::inLoadbalancing = 1;
1407 #endif
1408 }
1409
1410 extern "C"
1411 int CkInLdb(){
1412 #if CMK_MEM_CHECKPOINT
1413         return CkMemCheckPT::inLoadbalancing;
1414 #endif
1415 }
1416
1417 extern "C"
1418 void CkResetInLdb(){
1419 #if CMK_MEM_CHECKPOINT
1420         CkMemCheckPT::inLoadbalancing = 0;
1421 #endif
1422 }
1423
1424 /*****************************************************************************
1425                 module initialization
1426 *****************************************************************************/
1427
1428 static int arg_where = CkCheckPoint_inMEM;
1429
1430 #if CMK_MEM_CHECKPOINT
1431 void init_memcheckpt(char **argv)
1432 {
1433     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1434       arg_where = CkCheckPoint_inDISK;
1435     }
1436
1437         // initiliazing _crashedNode variable
1438         CpvInitialize(int, _crashedNode);
1439         CpvAccess(_crashedNode) = -1;
1440
1441 }
1442 #endif
1443
1444 class CkMemCheckPTInit: public Chare {
1445 public:
1446   CkMemCheckPTInit(CkArgMsg *m) {
1447 #if CMK_MEM_CHECKPOINT
1448     if (arg_where == CkCheckPoint_inDISK) {
1449       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1450     }
1451     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1452     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1453 #endif
1454   }
1455 };
1456
1457 static void notifyHandler(char *msg)
1458 {
1459 #if CMK_MEM_CHECKPOINT
1460   CmiFree(msg);
1461       /* immediately increase restart phase to filter old messages */
1462   CpvAccess(_curRestartPhase) ++;
1463   CpvAccess(_qd)->flushStates();
1464   _discard_charm_message();
1465
1466 #endif
1467 }
1468
1469 extern "C"
1470 void notify_crash(int node)
1471 {
1472 #ifdef CMK_MEM_CHECKPOINT
1473   CpvAccess( _crashedNode) = node;
1474 #ifdef CMK_SMP
1475   for(int i=0;i<CkMyNodeSize();i++){
1476         CpvAccessOther(_crashedNode,i)=node;
1477   }
1478 #endif
1479   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1480   CkMemCheckPT::inRestarting = 1;
1481
1482     // this may be in interrupt handler, send a message to reset QD
1483   int pe = CmiNodeFirst(CkMyNode());
1484   for(int i=0;i<CkMyNodeSize();i++){
1485         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1486         CmiSetHandler(msg, notifyHandlerIdx);
1487         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1488   }
1489 #endif
1490 }
1491
1492 extern "C" void (*notify_crash_fn)(int node);
1493
1494 #if CMK_CONVERSE_MPI
1495 static int pingHandlerIdx;
1496 static int pingCheckHandlerIdx;
1497 static int buddyDieHandlerIdx;
1498 static double lastPingTime = -1;
1499
1500 extern "C" void mpi_restart_crashed(int pe, int rank);
1501 extern "C" int  find_spare_mpirank(int pe);
1502
1503 void pingBuddy();
1504 void pingCheckHandler();
1505
1506 void buddyDieHandler(char *msg)
1507 {
1508 #if CMK_MEM_CHECKPOINT
1509    // notify
1510    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1511    notify_crash(diepe);
1512    // send message to crash pe to let it restart
1513    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1514    int newrank = find_spare_mpirank(diepe);
1515    int buddy = obj->BuddyPE(CmiMyPe());
1516    if (buddy == diepe)  {
1517      mpi_restart_crashed(diepe, newrank);
1518      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1519    }
1520 #endif
1521 }
1522
1523 void pingHandler(void *msg)
1524 {
1525   lastPingTime = CmiWallTimer();
1526   CmiFree(msg);
1527 }
1528
1529 void pingCheckHandler()
1530 {
1531 #if CMK_MEM_CHECKPOINT
1532   double now = CmiWallTimer();
1533   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1534     int i, pe, buddy;
1535     // tell everyone the buddy dies
1536     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1537     for (i = 1; i < CmiNumPes(); i++) {
1538        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1539        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1540     }
1541     buddy = pe;
1542     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1543     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1544       if (obj->isFailed(pe) || pe == buddy) continue;
1545       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1546       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1547       CmiSetHandler(msg, buddyDieHandlerIdx);
1548       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1549     }*/
1550     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1551     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1552     CmiSetHandler(msg, buddyDieHandlerIdx);
1553     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1554   }
1555   else 
1556     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1557 #endif
1558 }
1559
1560 void pingBuddy()
1561 {
1562 #if CMK_MEM_CHECKPOINT
1563   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1564   if (obj) {
1565     int buddy = obj->BuddyPE(CkMyPe());
1566 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1567     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1568     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1569     CmiSetHandler(msg, pingHandlerIdx);
1570     CmiGetRestartPhase(msg) = 9999;
1571     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1572   }
1573   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1574 #endif
1575 }
1576 #endif
1577
1578 // initproc
1579 void CkRegisterRestartHandler( )
1580 {
1581 #if CMK_MEM_CHECKPOINT
1582   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1583   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1584   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1585   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1586   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1587
1588 #if CMK_CONVERSE_MPI
1589   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1590   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1591   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1592 #endif
1593
1594   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1595   CpvAccess(procChkptBuf) = NULL;
1596
1597   notify_crash_fn = notify_crash;
1598
1599 #if ! CMK_CONVERSE_MPI
1600   // print pid to kill
1601 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1602 //  sleep(4);
1603 #endif
1604 #endif
1605 }
1606
1607
1608 extern "C"
1609 int CkHasCheckpoints()
1610 {
1611   return checkpointed;
1612 }
1613
1614 /// @todo: the following definitions should be moved to a separate file containing
1615 // structures and functions about fault tolerance strategies
1616
1617 /**
1618  *  * @brief: function for killing a process                                             
1619  *   */
1620 #ifdef CMK_MEM_CHECKPOINT
1621 #if CMK_HAS_GETPID
1622 void killLocal(void *_dummy,double curWallTime){
1623         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1624         if(CmiWallTimer()<killTime-1){
1625                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1626         }else{ 
1627 #if CMK_CONVERSE_MPI
1628                                 CkDieNow();
1629 #else 
1630                 kill(getpid(),SIGKILL);                                               
1631 #endif
1632         }              
1633
1634 #else
1635 void killLocal(void *_dummy,double curWallTime){
1636   CmiAbort("kill() not supported!");
1637 }
1638 #endif
1639 #endif
1640
1641 #ifdef CMK_MEM_CHECKPOINT
1642 /**
1643  * @brief: reads the file with the kill information
1644  */
1645 void readKillFile(){
1646         FILE *fp=fopen(killFile,"r");
1647         if(!fp){
1648                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1649                 return;
1650         }
1651         int proc;
1652         double sec;
1653         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1654                 if(proc == CkMyNode() && CkMyRank() == 0){
1655                         killTime = CmiWallTimer()+sec;
1656                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1657                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1658                 }
1659         }
1660         fclose(fp);
1661 }
1662
1663 #if ! CMK_CONVERSE_MPI
1664 void CkDieNow()
1665 {
1666 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1667          // ignored for non-mpi version
1668         CmiPrintf("[%d] die now.\n", CmiMyPe());
1669         killTime = CmiWallTimer()+0.001;
1670         CcdCallFnAfter(killLocal,NULL,1);
1671 #endif
1672 }
1673 #endif
1674
1675 #endif
1676
1677 #include "CkMemCheckpoint.def.h"
1678
1679