minor
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         1
70 #define STREAMING_INFORMHOME                    0
71 CpvDeclare(int, _crashedNode);
72
73 // static, so that it is accessible from Converse part
74 int CkMemCheckPT::inRestarting = 0;
75 int CkMemCheckPT::inLoadbalancing = 0;
76 double CkMemCheckPT::startTime;
77 char *CkMemCheckPT::stage;
78 CkCallback CkMemCheckPT::cpCallback;
79
80 int _memChkptOn = 1;                    // checkpoint is on or off
81
82 CkGroupID ckCheckPTGroupID;             // readonly
83
84 static int checkpointed = 0;
85
86 /// @todo the following declarations should be moved into a separate file for all 
87 // fault tolerant strategies
88
89 #ifdef CMK_MEM_CHECKPOINT
90 // name of the kill file that contains processes to be killed 
91 char *killFile;                                               
92 // flag for the kill file         
93 int killFlag=0;
94 // variable for storing the killing time
95 double killTime=0.0;
96 #endif
97
98 #ifdef CKLOCMGR_LOOP
99 #undef CKLOCMGR_LOOP
100 #endif
101 // loop over all CkLocMgr and do "code"
102 #define  CKLOCMGR_LOOP(code)    {       \
103   int numGroups = CkpvAccess(_groupIDTable)->size();    \
104   for(int i=0;i<numGroups;i++) {        \
105     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
106     if(obj->isLocMgr())  {      \
107       CkLocMgr *mgr = (CkLocMgr*)obj;   \
108       code      \
109     }   \
110   }     \
111  }
112
113 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
114 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
115
116 // compute the backup processor
117 // FIXME: avoid crashed processors
118 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
119
120 inline int CkMemCheckPT::BuddyPE(int pe)
121 {
122   int budpe;
123 #if NODE_CHECKPOINT
124     // buddy is the processor with same rank on the next physical node
125   int r1 = CmiPhysicalRank(pe);
126   int budnode = CmiPhysicalNodeID(pe);
127   do {
128     budnode = (budnode+1)%CmiNumPhysicalNodes();
129     int *pelist;
130     int num;
131     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
132     budpe = pelist[r1 % num];
133   } while (isFailed(budpe));
134   if (budpe == pe) {
135     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
136     CmiAbort("Failed to find a buddy processor");
137   }
138 #else
139   budpe = pe;
140   while (budpe == pe || isFailed(budpe)) 
141           budpe = (budpe+1)%CkNumPes();
142 #endif
143   return budpe;
144 }
145
146 // called in array element constructor
147 // choose and register with 2 buddies for checkpoiting 
148 #if CMK_MEM_CHECKPOINT
149 void ArrayElement::init_checkpt() {
150         if (_memChkptOn == 0) return;
151         if (CkInRestarting()) {
152           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
153         }
154         // only master init checkpoint
155         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
156
157         budPEs[0] = CkMyPe();
158         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
159         CmiAssert(budPEs[0] != budPEs[1]);
160         // inform checkPTMgr
161         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
162         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
163         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
164         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
165 }
166 #endif
167
168 // entry function invoked by checkpoint mgr asking for checkpoint data
169 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
170 #if CMK_MEM_CHECKPOINT
171 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
172 //char index[128];   thisIndexMax.sprint(index);
173 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
174   CkLocMgr *locMgr = thisArray->getLocMgr();
175   CmiAssert(myRec!=NULL);
176   int size;
177   {
178         PUP::sizer p;
179         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
180         size = p.size();
181   }
182   int packSize = size/sizeof(double) +1;
183   CkArrayCheckPTMessage *msg =
184                  new (packSize, 0) CkArrayCheckPTMessage;
185   msg->len = size;
186   msg->index =thisIndexMax;
187   msg->aid = thisArrayID;
188   msg->locMgr = locMgr->getGroupID();
189   msg->cp_flag = 1;
190   {
191         PUP::toMem p(msg->packData);
192         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
193   }
194
195   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
196   checkptMgr.recvData(msg, 2, budPEs);
197   delete m;
198 #endif
199 }
200
201 // checkpoint holder class - for memory checkpointing
202 class CkMemCheckPTInfo: public CkCheckPTInfo
203 {
204   CkArrayCheckPTMessage *ckBuffer;
205 public:
206   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
207                     CkCheckPTInfo(a, loc, idx, pno)
208   {
209     ckBuffer = NULL;
210   }
211   ~CkMemCheckPTInfo() 
212   {
213     if (ckBuffer) delete ckBuffer; 
214   }
215   inline void updateBuffer(CkArrayCheckPTMessage *data) 
216   {
217     CmiAssert(data!=NULL);
218     if (ckBuffer) delete ckBuffer;
219     ckBuffer = data;
220   }    
221   inline CkArrayCheckPTMessage * getCopy()
222   {
223     if (ckBuffer == NULL) {
224       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
225       CmiAbort("Abort!");
226     }
227     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
228   }     
229   inline void updateBuddy(int b1, int b2) {
230      CmiAssert(ckBuffer);
231      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
232      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
233      CmiAssert(pNo != CkMyPe());
234   }
235   inline int getSize() { 
236      CmiAssert(ckBuffer);
237      return ckBuffer->len; 
238   }
239 };
240
241 // checkpoint holder class - for in-disk checkpointing
242 class CkDiskCheckPTInfo: public CkCheckPTInfo 
243 {
244   char *fname;
245   int bud1, bud2;
246   int len;                      // checkpoint size
247 public:
248   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
249   {
250 #if CMK_USE_MKSTEMP
251     fname = new char[64];
252     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
253     mkstemp(fname);
254 #else
255     fname=tmpnam(NULL);
256 #endif
257     bud1 = bud2 = -1;
258     len = 0;
259   }
260   ~CkDiskCheckPTInfo() 
261   {
262     remove(fname);
263   }
264   inline void updateBuffer(CkArrayCheckPTMessage *data) 
265   {
266     double t = CmiWallTimer();
267     // unpack it
268     envelope *env = UsrToEnv(data);
269     CkUnpackMessage(&env);
270     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
271     FILE *f = fopen(fname,"wb");
272     PUP::toDisk p(f);
273     CkPupMessage(p, (void **)&data);
274     // delay sync to the end because otherwise the messages are blocked
275 //    fsync(fileno(f));
276     fclose(f);
277     bud1 = data->bud1;
278     bud2 = data->bud2;
279     len = data->len;
280     delete data;
281     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
282   }
283   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
284   {
285     CkArrayCheckPTMessage *data;
286     FILE *f = fopen(fname,"rb");
287     PUP::fromDisk p(f);
288     CkPupMessage(p, (void **)&data);
289     fclose(f);
290     data->bud1 = bud1;                          // update the buddies
291     data->bud2 = bud2;
292     return data;
293   }
294   inline void updateBuddy(int b1, int b2) {
295      bud1 = b1; bud2 = b2;
296      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
297      CmiAssert(pNo != CkMyPe());
298   }
299   inline int getSize() { 
300      return len; 
301   }
302 };
303
304 CkMemCheckPT::CkMemCheckPT(int w)
305 {
306   int numnodes = 0;
307 #if NODE_CHECKPOINT
308   numnodes = CmiNumPhysicalNodes();
309 #else
310   numnodes = CkNumPes();
311 #endif
312 #if CK_NO_PROC_POOL
313   if (numnodes <= 2)
314 #else
315   if (numnodes  == 1)
316 #endif
317   {
318     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
319     _memChkptOn = 0;
320   }
321   inRestarting = 0;
322   recvCount = peCount = 0;
323   ackCount = 0;
324   expectCount = -1;
325   where = w;
326
327 #if CMK_CONVERSE_MPI
328   void pingBuddy();
329   void pingCheckHandler();
330   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
331   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
332 #endif
333 }
334
335 CkMemCheckPT::~CkMemCheckPT()
336 {
337   int len = ckTable.length();
338   for (int i=0; i<len; i++) {
339     delete ckTable[i];
340   }
341 }
342
343 void CkMemCheckPT::pup(PUP::er& p) 
344
345   CBase_CkMemCheckPT::pup(p); 
346   p|cpStarter;
347   p|thisFailedPe;
348   p|failedPes;
349   p|ckCheckPTGroupID;           // recover global variable
350   p|cpCallback;                 // store callback
351   p|where;                      // where to checkpoint
352   p|peCount;
353   if (p.isUnpacking()) {
354     recvCount = 0;
355 #if CMK_CONVERSE_MPI
356     void pingBuddy();
357     void pingCheckHandler();
358     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
359     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
360 #endif
361   }
362 }
363
364 // called by checkpoint mgr to restore an array element
365 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
366 {
367 #if CMK_MEM_CHECKPOINT
368   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
369   // m->index.print();
370   PUP::fromMem p(m->packData);
371   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
372   CmiAssert(mgr);
373 #if STREAMING_INFORMHOME
374   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
375 #else
376   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
377 #endif
378
379   // find a list of array elements bound together
380   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
381   CmiAssert(elt);
382   CkLocRec_local *rec = elt->myRec;
383   CkVec<CkMigratable *> list;
384   mgr->migratableList(rec, list);
385   CmiAssert(list.length() > 0);
386   for (int l=0; l<list.length(); l++) {
387     elt = (ArrayElement *)list[l];
388     elt->budPEs[0] = m->bud1;
389     elt->budPEs[1] = m->bud2;
390     //    reset, may not needed now
391     // for now.
392     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
393       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
394       if (c) c->redNo = 0;
395     }
396   }
397 #endif
398 }
399
400 // return 1 if pe is a crashed processor
401 int CkMemCheckPT::isFailed(int pe)
402 {
403   for (int i=0; i<failedPes.length(); i++)
404     if (failedPes[i] == pe) return 1;
405   return 0;
406 }
407
408 // add pe into history list of all failed processors
409 void CkMemCheckPT::failed(int pe)
410 {
411   if (isFailed(pe)) return;
412   failedPes.push_back(pe);
413 }
414
415 int CkMemCheckPT::totalFailed()
416 {
417   return failedPes.length();
418 }
419
420 // create an checkpoint entry for array element of aid with index.
421 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
422 {
423   // error check, no duplicate
424   int idx, len = ckTable.size();
425   for (idx=0; idx<len; idx++) {
426     CkCheckPTInfo *entry = ckTable[idx];
427     if (index == entry->index) {
428       if (loc == entry->locMgr) {
429           // bindTo array elements
430           return;
431       }
432         // for array inheritance, the following check may fail
433         // because ArrayElement constructor of all superclasses are called
434       if (aid == entry->aid) {
435         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
436         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
437       }
438     }
439   }
440   CkCheckPTInfo *newEntry;
441   if (where == CkCheckPoint_inMEM)
442     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
443   else
444     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
445   ckTable.push_back(newEntry);
446   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
447 }
448
449 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
450 {
451 #if !CMK_CHKP_ALL       
452   int buddy = msg->bud1;
453   if (buddy == CkMyPe()) buddy = msg->bud2;
454   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
455   recvData(msg);
456     // ack
457   thisProxy[buddy].gotData();
458 #else
459   recvArrayCheckpoint(msg);
460   thisProxy[msg->bud2].gotData();
461 #endif
462 }
463
464 // loop through my checkpoint table and ask checkpointed array elements
465 // to send me checkpoint data.
466 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
467 {
468   checkpointed = 1;
469   cpCallback = cb;
470   cpStarter = starter;
471   if (CkMyPe() == cpStarter) {
472     startTime = CmiWallTimer();
473     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
474   }
475 #if !CMK_CHKP_ALL
476   int len = ckTable.length();
477   for (int i=0; i<len; i++) {
478     CkCheckPTInfo *entry = ckTable[i];
479       // always let the bigger number processor send request
480     //if (CkMyPe() < entry->pNo) continue;
481       // always let the smaller number processor send request, may on same proc
482     if (!isMaster(entry->pNo)) continue;
483       // call inmem_checkpoint to the array element, ask it to send
484       // back checkpoint data via recvData().
485     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
486     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
487   }
488     // if my table is empty, then I am done
489   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
490 #else
491   startArrayCheckpoint();
492 #endif
493   // pack and send proc level data
494   sendProcData();
495 }
496
497 class MemElementPacker : public CkLocIterator{
498         private:
499                 CkLocMgr *locMgr;
500                 PUP::er &p;
501         public:
502                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
503                 void addLocation(CkLocation &loc){
504                         CkArrayIndexMax idx = loc.getIndex();
505                         CkGroupID gID = locMgr->ckGetGroupID();
506                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
507                         CmiAssert(elt);
508                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
509                         p|gID;
510                         p|idx;
511                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
512                 }
513 };
514
515 void CkMemCheckPT::pupAllElements(PUP::er &p){
516 #if CMK_CHKP_ALL
517         int numElements;
518         if(!p.isUnpacking()){
519                 numElements = CkCountArrayElements();
520         }
521         p | numElements;
522         if(!p.isUnpacking()){
523                 //CKLOCMGR_LOOP(mgr->checkpointRemoteIdx(p););
524                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
525         }
526 #endif
527 }
528
529 void CkMemCheckPT::startArrayCheckpoint(){
530 #if CMK_CHKP_ALL
531         int size;
532         {
533                 PUP::sizer psizer;
534                 pupAllElements(psizer);
535                 size = psizer.size();
536         }
537         int packSize = size/sizeof(double)+1;
538  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
539         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
540         msg->len = size;
541         msg->cp_flag = 1;
542         int budPEs[2];
543         msg->bud1=CkMyPe();
544         msg->bud2=ChkptOnPe(CkMyPe());
545         {
546                 PUP::toMem p(msg->packData);
547                 pupAllElements(p);
548         }
549         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
550         chkpTable[0] = msg;
551         recvCount++;
552 #endif
553 }
554
555 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
556 {
557 #if CMK_CHKP_ALL
558         int idx = 1;
559         if(msg->bud1 == CkMyPe()){
560                 idx = 0;
561         }
562         int isChkpting = msg->cp_flag;
563         chkpTable[idx] = msg;
564         if(isChkpting){
565                 recvCount++;
566                 if(recvCount == 2){
567                   if (where == CkCheckPoint_inMEM) {
568                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
569                   }
570                   else if (where == CkCheckPoint_inDISK) {
571                         // another barrier for finalize the writing using fsync
572                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
573                         contribute(0,NULL,CkReduction::sum_int,localcb);
574                   }
575                   else
576                         CmiAbort("Unknown checkpoint scheme");
577                   recvCount = 0;
578                 }
579         }
580 #endif
581 }
582
583 // don't handle array elements
584 static inline void _handleProcData(PUP::er &p)
585 {
586     // save readonlys, and callback BTW
587     CkPupROData(p);
588
589     // save mainchares 
590     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
591         
592 #ifndef CMK_CHARE_USE_PTR
593     // save non-migratable chare
594     CkPupChareData(p);
595 #endif
596
597     // save groups into Groups.dat
598     CkPupGroupData(p);
599
600     // save nodegroups into NodeGroups.dat
601     if(CkMyRank()==0) CkPupNodeGroupData(p);
602 }
603
604 void CkMemCheckPT::sendProcData()
605 {
606   // find out size of buffer
607   int size;
608   {
609     PUP::sizer p;
610     _handleProcData(p);
611     size = p.size();
612   }
613   int packSize = size;
614   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
615   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
616   {
617     PUP::toMem p(msg->packData);
618     _handleProcData(p);
619   }
620   msg->pe = CkMyPe();
621   msg->len = size;
622   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
623   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
624 }
625
626 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
627 {
628   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
629   CpvAccess(procChkptBuf) = msg;
630   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
631   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
632 }
633
634 // ArrayElement call this function to give us the checkpointed data
635 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
636 {
637   int len = ckTable.length();
638   int idx;
639   for (idx=0; idx<len; idx++) {
640     CkCheckPTInfo *entry = ckTable[idx];
641     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
642   }
643   CkAssert(idx < len);
644   int isChkpting = msg->cp_flag;
645   ckTable[idx]->updateBuffer(msg);
646   if (isChkpting) {
647       // all my array elements have returned their inmem data
648       // inform starter processor that I am done.
649     recvCount ++;
650     if (recvCount == ckTable.length()) {
651       if (where == CkCheckPoint_inMEM) {
652         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
653       }
654       else if (where == CkCheckPoint_inDISK) {
655         // another barrier for finalize the writing using fsync
656         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
657         contribute(0,NULL,CkReduction::sum_int,localcb);
658       }
659       else
660         CmiAbort("Unknown checkpoint scheme");
661       recvCount = 0;
662     } 
663   }
664 }
665
666 // only used in disk checkpointing
667 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
668 {
669   delete m;
670 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
671   system("sync");
672 #endif
673   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
674 }
675
676 // only is called on cpStarter when checkpoint is done
677 void CkMemCheckPT::cpFinish()
678 {
679   CmiAssert(CkMyPe() == cpStarter);
680   peCount++;
681     // now that all processors have finished, activate callback
682   if (peCount == 2) {
683     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
684     cpCallback.send();
685     peCount = 0;
686 #if !CMK_CHKP_ALL
687     thisProxy.report();
688 #endif
689   }
690 }
691
692 // for debugging, report checkpoint info
693 void CkMemCheckPT::report()
694 {
695   int objsize = 0;
696   int len = ckTable.length();
697   for (int i=0; i<len; i++) {
698     CkCheckPTInfo *entry = ckTable[i];
699     CmiAssert(entry);
700     objsize += entry->getSize();
701   }
702   CmiAssert(CpvAccess(procChkptBuf));
703 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
704 }
705
706 /*****************************************************************************
707                         RESTART Procedure
708 *****************************************************************************/
709
710 // master processor of two buddies
711 inline int CkMemCheckPT::isMaster(int buddype)
712 {
713 #if 0
714   int mype = CkMyPe();
715 //CkPrintf("ismaster: %d %d\n", pe, mype);
716   if (CkNumPes() - totalFailed() == 2) {
717     return mype > buddype;
718   }
719   for (int i=1; i<CkNumPes(); i++) {
720     int me = (buddype+i)%CkNumPes();
721     if (isFailed(me)) continue;
722     if (me == mype) return 1;
723     else return 0;
724   }
725   return 0;
726 #else
727     // smaller one
728   int mype = CkMyPe();
729 //CkPrintf("ismaster: %d %d\n", pe, mype);
730   if (CkNumPes() - totalFailed() == 2) {
731     return mype < buddype;
732   }
733 #if NODE_CHECKPOINT
734   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
735   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
736 #else
737   for (int i=1; i<CkNumPes(); i++) {
738 #endif
739     int me = (mype+i)%CkNumPes();
740     if (isFailed(me)) continue;
741     if (me == buddype) return 1;
742     else return 0;
743   }
744   return 0;
745 #endif
746 }
747
748
749
750 #if 0
751 // helper class to pup all elements that belong to same ckLocMgr
752 class ElementDestoryer : public CkLocIterator {
753 private:
754         CkLocMgr *locMgr;
755 public:
756         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
757         void addLocation(CkLocation &loc) {
758                 CkArrayIndex idx=loc.getIndex();
759                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
760                 loc.destroy();
761         }
762 };
763 #endif
764
765 // restore the bitmap vector for LB
766 void CkMemCheckPT::resetLB(int diepe)
767 {
768 #if CMK_LBDB_ON
769   int i;
770   char *bitmap = new char[CkNumPes()];
771   // set processor available bitmap
772   get_avail_vector(bitmap);
773
774   for (i=0; i<failedPes.length(); i++)
775     bitmap[failedPes[i]] = 0; 
776   bitmap[diepe] = 0;
777
778 #if CK_NO_PROC_POOL
779   set_avail_vector(bitmap);
780 #endif
781
782   // if I am the crashed pe, rebuild my failedPEs array
783   if (CkMyNode() == diepe)
784     for (i=0; i<CkNumPes(); i++) 
785       if (bitmap[i]==0) failed(i);
786
787   delete [] bitmap;
788 #endif
789 }
790
791 // in case when failedPe dies, everybody go through its checkpoint table:
792 // destory all array elements
793 // recover lost buddies
794 // reconstruct all array elements from check point data
795 // called on all processors
796 void CkMemCheckPT::restart(int diePe)
797 {
798 #if CMK_MEM_CHECKPOINT
799   double curTime = CmiWallTimer();
800   if (CkMyPe() == diePe)
801     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
802   stage = (char*)"resetLB";
803   startTime = curTime;
804   if (CkMyPe() == diePe)
805     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
806
807 #if CK_NO_PROC_POOL
808   failed(diePe);        // add into the list of failed pes
809 #endif
810   thisFailedPe = diePe;
811
812   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
813
814   inRestarting = 1;
815                                                                                 
816   // disable load balancer's barrier
817   if (CkMyPe() != diePe) resetLB(diePe);
818
819   CKLOCMGR_LOOP(mgr->startInserting(););
820
821   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
822   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
823 /*
824   if (CkMyPe() == 0)
825     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
826 */
827 #endif
828 }
829
830 // loally remove all array elements
831 void CkMemCheckPT::removeArrayElements()
832 {
833 #if CMK_MEM_CHECKPOINT
834   int len = ckTable.length();
835   double curTime = CmiWallTimer();
836   if (CkMyPe() == thisFailedPe) 
837     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
838   stage = (char*)"removeArrayElements";
839   startTime = curTime;
840
841   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
842   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
843
844   // get rid of all buffering and remote recs
845   // including destorying all array elements
846   CKLOCMGR_LOOP(mgr->flushAllRecs(););
847
848 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
849
850   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
851   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
852 #endif
853 }
854
855 // flush state in reduction manager
856 void CkMemCheckPT::resetReductionMgr()
857 {
858   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
859   int numGroups = CkpvAccess(_groupIDTable)->size();
860   for(int i=0;i<numGroups;i++) {
861     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
862     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
863     obj->flushStates();
864     obj->ckJustMigrated();
865   }
866   // reset again
867   //CpvAccess(_qd)->flushStates();
868
869 #if 1
870   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
871   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
872 #else
873   if (CkMyPe() == 0)
874     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
875 #endif
876 }
877
878 // recover the lost buddies
879 void CkMemCheckPT::recoverBuddies()
880 {
881   int idx;
882   int len = ckTable.length();
883   // ready to flush reduction manager
884   // cannot be CkMemCheckPT::restart because destory will modify states
885   double curTime = CmiWallTimer();
886   if (CkMyPe() == thisFailedPe)
887   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
888   stage = (char *)"recoverBuddies";
889   if (CkMyPe() == thisFailedPe)
890   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
891   startTime = curTime;
892
893   // recover buddies
894   expectCount = 0;
895 #if !CMK_CHKP_ALL
896   for (idx=0; idx<len; idx++) {
897     CkCheckPTInfo *entry = ckTable[idx];
898     if (entry->pNo == thisFailedPe) {
899 #if CK_NO_PROC_POOL
900       // find a new buddy
901       int budPe = BuddyPE(CkMyPe());
902 #else
903       int budPe = thisFailedPe;
904 #endif
905       CkArrayCheckPTMessage *msg = entry->getCopy();
906       msg->bud1 = budPe;
907       msg->bud2 = CkMyPe();
908       msg->cp_flag = 0;            // not checkpointing
909       thisProxy[budPe].recoverEntry(msg);
910       expectCount ++;
911     }
912   }
913 #else
914   //send to failed pe
915   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
916 #if CK_NO_PROC_POOL
917       // find a new buddy
918       int budPe = BuddyPE(CkMyPe());
919 #else
920       int budPe = thisFailedPe;
921 #endif
922       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
923       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
924           msg->cp_flag = 0;            // not checkpointing
925       msg->bud1 = budPe;
926       msg->bud2 = CkMyPe();
927       thisProxy[budPe].recoverEntry(msg);
928       expectCount ++;
929   }
930 #endif
931
932 #if 1
933   if (expectCount == 0) {
934     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
935     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
936   }
937 #else
938   if (CkMyPe() == 0) {
939     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
940   }
941 #endif
942
943   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
944 }
945
946 void CkMemCheckPT::gotData()
947 {
948   ackCount ++;
949   if (ackCount == expectCount) {
950     ackCount = 0;
951     expectCount = -1;
952     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
953     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
954   }
955 }
956
957 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
958 {
959   for (int i=0; i<n; i++) {
960     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
961     mgr->updateLocation(idx[i], nowOnPe);
962   }
963 }
964
965 // restore array elements
966 void CkMemCheckPT::recoverArrayElements()
967 {
968   double curTime = CmiWallTimer();
969   int len = ckTable.length();
970   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
971   stage = (char *)"recoverArrayElements";
972   if (CkMyPe() == thisFailedPe)
973   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
974   startTime = curTime;
975  int flag = 0;
976   // recover all array elements
977   int count = 0;
978 //#if STREAMING_INFORMHOME
979   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
980   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
981 //#endif
982 #if !CMK_CHKP_ALL
983   for (int idx=0; idx<len; idx++)
984   {
985     CkCheckPTInfo *entry = ckTable[idx];
986 #if CK_NO_PROC_POOL
987     // the bigger one will do 
988 //    if (CkMyPe() < entry->pNo) continue;
989     if (!isMaster(entry->pNo)) continue;
990 #else
991     // smaller one do it, which has the original object
992     if (CkMyPe() == entry->pNo+1 || 
993         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
994 #endif
995 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
996
997     entry->updateBuddy(CkMyPe(), entry->pNo);
998     CkArrayCheckPTMessage *msg = entry->getCopy();
999     // gzheng
1000     //thisProxy[CkMyPe()].inmem_restore(msg);
1001     inmem_restore(msg);
1002 #if STREAMING_INFORMHOME
1003     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1004     int homePe = mgr->homePe(msg->index);
1005     if (homePe != CkMyPe()) {
1006       gmap[homePe].push_back(msg->locMgr);
1007       imap[homePe].push_back(msg->index);
1008     }
1009 #endif
1010     CkFreeMsg(msg);
1011     count ++;
1012   }
1013 #else
1014         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1015         recoverAll(msg,gmap,imap);
1016     CkFreeMsg(msg);
1017 #endif
1018   curTime = CmiWallTimer();
1019   if (CkMyPe() == thisFailedPe)
1020         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1021 #if STREAMING_INFORMHOME
1022   for (int i=0; i<CkNumPes(); i++) {
1023     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1024       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1025         //CkPrintf("[%d] send to %d at %lf\n",CkMyPe(),i,CmiWallTimer());
1026         flag++; 
1027           }
1028   }
1029 #endif
1030   delete [] imap;
1031   delete [] gmap;
1032   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1033
1034   CKLOCMGR_LOOP(mgr->doneInserting(););
1035
1036   // _crashedNode = -1;
1037   CpvAccess(_crashedNode) = -1;
1038 if(CkMyPe()!=thisFailedPe)
1039   inRestarting = 0;
1040  // if (CkMyPe() == 0)
1041    // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1042 if(flag == 0)
1043 {
1044     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1045 }
1046 }
1047
1048 void CkMemCheckPT::gotReply(){
1049         //CkPrintf("[%d] got reply at %lf\n",CkMyPe(),CmiWallTimer());
1050     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1051 }
1052
1053 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1054 #if CMK_CHKP_ALL
1055         PUP::fromMem p(msg->packData);
1056         int numElements;
1057         p|numElements;
1058         if(p.isUnpacking()){
1059                 for(int i=0;i<numElements;i++){
1060                         CkGroupID gID;
1061                         CkArrayIndex idx;
1062                         p|gID;
1063                         p|idx;
1064                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1065                         int homePe = mgr->homePe(idx);
1066 #if STREAMING_INFORMHOME
1067                         mgr->resume(idx,p,CmiFalse);
1068 #else
1069                         if(homePe == thisFailedPe && homePe!=CkMyPe()){
1070                                 mgr->resume(idx,p,CmiTrue);
1071        // CkPrintf("[%d] send to crashed pe %d\n",CkMyPe(),thisFailedPe);
1072       }
1073       else
1074                                 mgr->resume(idx,p,CmiFalse);
1075 #endif
1076                           /*CkLocRec_local *rec = loc.getLocalRecord();
1077                           CmiAssert(rec);
1078                           CkVec<CkMigratable *> list;
1079                           mgr->migratableList(rec, list);
1080                           CmiAssert(list.length() > 0);
1081                           for (int l=0; l<list.length(); l++) {
1082                                 ArrayElement * elt = (ArrayElement *)list[l];
1083                                 //    reset, may not needed now
1084                                 // for now.
1085                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1086                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1087                                   if (c) c->redNo = 0;
1088                                 }
1089                           }*/
1090 #if STREAMING_INFORMHOME
1091                         homePe = mgr->homePe(idx);
1092                         if (homePe != CkMyPe()) {
1093                           gmap[homePe].push_back(gID);
1094                           imap[homePe].push_back(idx);
1095                         }
1096 #endif
1097                 }
1098         }
1099 #endif
1100 }
1101
1102 static double restartT;
1103
1104 // on every processor
1105 // turn load balancer back on
1106 void CkMemCheckPT::finishUp()
1107 {
1108   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1109   //CKLOCMGR_LOOP(mgr->doneInserting(););
1110   
1111   if (CkMyPe() == thisFailedPe)
1112   {
1113         inRestarting=0;
1114        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1115        //CkStartQD(cpCallback);
1116        cpCallback.send();
1117        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1118   }
1119
1120 #if CK_NO_PROC_POOL
1121 #if NODE_CHECKPOINT
1122   int numnodes = CmiNumPhysicalNodes();
1123 #else
1124   int numnodes = CkNumPes();
1125 #endif
1126   if (numnodes-totalFailed() <=2) {
1127     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1128     _memChkptOn = 0;
1129   }
1130 #endif
1131 }
1132
1133 // called only on 0
1134 void CkMemCheckPT::quiescence(CkCallback &cb)
1135 {
1136   static int pe_count = 0;
1137   pe_count ++;
1138   CmiAssert(CkMyPe() == 0);
1139   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1140   if (pe_count == CkNumPes()) {
1141     pe_count = 0;
1142     cb.send();
1143   }
1144 }
1145
1146 // User callable function - to start a checkpoint
1147 // callback cb is used to pass control back
1148 void CkStartMemCheckpoint(CkCallback &cb)
1149 {
1150 #if CMK_MEM_CHECKPOINT
1151   if (_memChkptOn == 0) {
1152     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1153     cb.send();
1154     return;
1155   }
1156   if (CkInRestarting()) {
1157       // trying to checkpointing during restart
1158     cb.send();
1159     return;
1160   }
1161     // store user callback and user data
1162   CkMemCheckPT::cpCallback = cb;
1163
1164     // broadcast to start check pointing
1165   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1166   checkptMgr.doItNow(CkMyPe(), cb);
1167 #else
1168   // when mem checkpoint is disabled, invike cb immediately
1169   cb.send();
1170 #endif
1171 }
1172
1173 void CkRestartCheckPoint(int diePe)
1174 {
1175 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1176   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1177   // broadcast
1178   checkptMgr.restart(diePe);
1179 }
1180
1181 static int _diePE = -1;
1182
1183 // callback function used locally by ccs handler
1184 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1185 {
1186 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1187   CkRestartCheckPoint(_diePE);
1188 }
1189
1190 // Converse function handles
1191 static int askPhaseHandlerIdx;
1192 static int recvPhaseHandlerIdx;
1193 static int askProcDataHandlerIdx;
1194 static int restartBcastHandlerIdx;
1195 static int recoverProcDataHandlerIdx;
1196 static int restartBeginHandlerIdx;
1197 static int notifyHandlerIdx;
1198
1199 // called on crashed PE
1200 static void restartBeginHandler(char *msg)
1201 {
1202   CmiFree(msg);
1203 #if CMK_MEM_CHECKPOINT
1204 #if CMK_USE_BARRIER
1205         if(CkMyPe()!=_diePE){
1206                 printf("restar begin on %d\n",CkMyPe());
1207                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1208                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1209                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1210         }else{
1211         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1212         CkRestartCheckPointCallback(NULL, NULL);
1213         }
1214 #else
1215   static int count = 0;
1216   CmiAssert(CkMyPe() == _diePE);
1217   count ++;
1218   if (count == CkNumPes()) {
1219     CkRestartCheckPointCallback(NULL, NULL);
1220     count = 0;
1221   }
1222 #endif
1223 #endif
1224 }
1225
1226 extern void _discard_charm_message();
1227 extern void _resume_charm_message();
1228
1229 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1230         return data;
1231 }
1232
1233 static void restartBcastHandler(char *msg)
1234 {
1235 #if CMK_MEM_CHECKPOINT
1236   // advance phase counter
1237   CkMemCheckPT::inRestarting = 1;
1238   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1239   // gzheng
1240   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1241
1242   if (CkMyPe()==_diePE)
1243     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1244
1245   // reset QD counters
1246 /*  gzheng
1247   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1248 */
1249
1250 /*  gzheng
1251   if (CkMyPe()==_diePE)
1252       CkRestartCheckPointCallback(NULL, NULL);
1253 */
1254   CmiFree(msg);
1255
1256   _resume_charm_message();
1257
1258     // reduction
1259   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1260   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1261 #if CMK_USE_BARRIER
1262         //CmiPrintf("before reduce\n"); 
1263         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1264         //CmiPrintf("after reduce\n");  
1265 #else
1266   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1267 #endif 
1268  checkpointed = 0;
1269 #endif
1270 }
1271
1272 extern void _initDone();
1273 #include "hwi/include/bqc/A2_inlines.h"
1274
1275 // called on crashed processor
1276 static void recoverProcDataHandler(char *msg)
1277 {
1278 #if CMK_MEM_CHECKPOINT
1279    int i;
1280    envelope *env = (envelope *)msg;
1281    CkUnpackMessage(&env);
1282    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1283    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1284    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1285    //cur_restart_phase ++;
1286      // gzheng ?
1287    //CpvAccess(_qd)->flushStates();
1288
1289    // restore readonly, mainchare, group, nodegroup
1290 //   int temp = cur_restart_phase;
1291 //   cur_restart_phase = -1;
1292    PUP::fromMem p(procMsg->packData);
1293    _handleProcData(p);
1294
1295    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1296    // gzheng
1297    CKLOCMGR_LOOP(mgr->startInserting(););
1298
1299    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1300    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1301    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1302    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1303
1304    _initDone();
1305 //   CpvAccess(_qd)->flushStates();
1306    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1307 #endif
1308 }
1309
1310 // called on its backup processor
1311 // get backup message buffer and sent to crashed processor
1312 static void askProcDataHandler(char *msg)
1313 {
1314 #if CMK_MEM_CHECKPOINT
1315     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1316     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1317     if (CpvAccess(procChkptBuf) == NULL)  {
1318       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1319       CkAbort("no checkpoint found");
1320     }
1321     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1322     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1323
1324     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1325
1326     CkPackMessage(&env);
1327     CmiSetHandler(env, recoverProcDataHandlerIdx);
1328     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1329     CpvAccess(procChkptBuf) = NULL;
1330     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1331 #endif
1332 }
1333
1334 // called on PE 0
1335 void qd_callback(void *m)
1336 {
1337    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1338    CkFreeMsg(m);
1339 #ifdef CMK_SMP
1340    for(int i=0;i<CmiMyNodeSize();i++){
1341    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1342    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1343         CmiSetHandler(msg, askProcDataHandlerIdx);
1344         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1345         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1346    }
1347    return;
1348 #endif
1349    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1350    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1351    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1352    CmiSetHandler(msg, askProcDataHandlerIdx);
1353    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1354    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1355
1356 }
1357
1358 // on crashed node
1359 void CkMemRestart(const char *dummy, CkArgMsg *args)
1360 {
1361 #if CMK_MEM_CHECKPOINT
1362    _diePE = CmiMyNode();
1363    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1364    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1365    CkMemCheckPT::inRestarting = 1;
1366
1367   CpvAccess( _crashedNode )= CmiMyNode();
1368         
1369   _discard_charm_message();
1370     restartT = CmiWallTimer();
1371    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1372   
1373   /*if(CmiMyRank()==0){
1374     CkCallback cb(qd_callback);
1375     CkStartQD(cb);
1376     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1377   }*/
1378    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1379    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1380    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1381    CmiSetHandler(msg, askProcDataHandlerIdx);
1382    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1383    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1384 #else
1385    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1386 #endif
1387 }
1388
1389 // can be called in other files
1390 // return true if it is in restarting
1391 extern "C"
1392 int CkInRestarting()
1393 {
1394 #if CMK_MEM_CHECKPOINT
1395   if (CpvAccess( _crashedNode)!=-1) return 1;
1396   // gzheng
1397   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1398   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1399   return CkMemCheckPT::inRestarting;
1400 #else
1401   return 0;
1402 #endif
1403 }
1404
1405 extern "C"
1406 void CkSetInLdb(){
1407 #if CMK_MEM_CHECKPOINT
1408         CkMemCheckPT::inLoadbalancing = 1;
1409 #endif
1410 }
1411
1412 extern "C"
1413 int CkInLdb(){
1414 #if CMK_MEM_CHECKPOINT
1415         return CkMemCheckPT::inLoadbalancing;
1416 #endif
1417 }
1418
1419 extern "C"
1420 void CkResetInLdb(){
1421 #if CMK_MEM_CHECKPOINT
1422         CkMemCheckPT::inLoadbalancing = 0;
1423 #endif
1424 }
1425
1426 /*****************************************************************************
1427                 module initialization
1428 *****************************************************************************/
1429
1430 static int arg_where = CkCheckPoint_inMEM;
1431
1432 #if CMK_MEM_CHECKPOINT
1433 void init_memcheckpt(char **argv)
1434 {
1435     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1436       arg_where = CkCheckPoint_inDISK;
1437     }
1438
1439         // initiliazing _crashedNode variable
1440         CpvInitialize(int, _crashedNode);
1441         CpvAccess(_crashedNode) = -1;
1442
1443 }
1444 #endif
1445
1446 class CkMemCheckPTInit: public Chare {
1447 public:
1448   CkMemCheckPTInit(CkArgMsg *m) {
1449 #if CMK_MEM_CHECKPOINT
1450     if (arg_where == CkCheckPoint_inDISK) {
1451       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1452     }
1453     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1454     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1455 #endif
1456   }
1457 };
1458
1459 static void notifyHandler(char *msg)
1460 {
1461 #if CMK_MEM_CHECKPOINT
1462   CmiFree(msg);
1463       /* immediately increase restart phase to filter old messages */
1464   CpvAccess(_curRestartPhase) ++;
1465   CpvAccess(_qd)->flushStates();
1466   _discard_charm_message();
1467
1468 #endif
1469 }
1470
1471 extern "C"
1472 void notify_crash(int node)
1473 {
1474 #ifdef CMK_MEM_CHECKPOINT
1475   CpvAccess( _crashedNode) = node;
1476 #ifdef CMK_SMP
1477   for(int i=0;i<CkMyNodeSize();i++){
1478         CpvAccessOther(_crashedNode,i)=node;
1479   }
1480 #endif
1481   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1482   CkMemCheckPT::inRestarting = 1;
1483
1484     // this may be in interrupt handler, send a message to reset QD
1485   int pe = CmiNodeFirst(CkMyNode());
1486   for(int i=0;i<CkMyNodeSize();i++){
1487         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1488         CmiSetHandler(msg, notifyHandlerIdx);
1489         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1490   }
1491 #endif
1492 }
1493
1494 extern "C" void (*notify_crash_fn)(int node);
1495
1496 #if CMK_CONVERSE_MPI
1497 static int pingHandlerIdx;
1498 static int pingCheckHandlerIdx;
1499 static int buddyDieHandlerIdx;
1500 static double lastPingTime = -1;
1501
1502 extern "C" void mpi_restart_crashed(int pe, int rank);
1503 extern "C" int  find_spare_mpirank(int pe);
1504
1505 void pingBuddy();
1506 void pingCheckHandler();
1507
1508 void buddyDieHandler(char *msg)
1509 {
1510 #if CMK_MEM_CHECKPOINT
1511    // notify
1512    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1513    notify_crash(diepe);
1514    // send message to crash pe to let it restart
1515    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1516    int newrank = find_spare_mpirank(diepe);
1517    int buddy = obj->BuddyPE(CmiMyPe());
1518    if (buddy == diepe)  {
1519      mpi_restart_crashed(diepe, newrank);
1520      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1521    }
1522 #endif
1523 }
1524
1525 void pingHandler(void *msg)
1526 {
1527   lastPingTime = CmiWallTimer();
1528   CmiFree(msg);
1529 }
1530
1531 void pingCheckHandler()
1532 {
1533 #if CMK_MEM_CHECKPOINT
1534   double now = CmiWallTimer();
1535   if (lastPingTime > 0 && now - lastPingTime > 20 && !CkInLdb()) {
1536     int i, pe, buddy;
1537     // tell everyone the buddy dies
1538     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1539     for (i = 1; i < CmiNumPes(); i++) {
1540        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1541        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1542     }
1543     buddy = pe;
1544     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1545     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1546       if (obj->isFailed(pe) || pe == buddy) continue;
1547       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1548       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1549       CmiSetHandler(msg, buddyDieHandlerIdx);
1550       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1551     }*/
1552     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1553     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1554     CmiSetHandler(msg, buddyDieHandlerIdx);
1555     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1556   }
1557   else 
1558     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1559 #endif
1560 }
1561
1562 void pingBuddy()
1563 {
1564 #if CMK_MEM_CHECKPOINT
1565   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1566   if (obj) {
1567     int buddy = obj->BuddyPE(CkMyPe());
1568 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1569     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1570     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1571     CmiSetHandler(msg, pingHandlerIdx);
1572     CmiGetRestartPhase(msg) = 9999;
1573     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1574   }
1575   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1576 #endif
1577 }
1578 #endif
1579
1580 // initproc
1581 void CkRegisterRestartHandler( )
1582 {
1583 #if CMK_MEM_CHECKPOINT
1584   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1585   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1586   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1587   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1588   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1589
1590 #if CMK_CONVERSE_MPI
1591   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1592   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1593   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1594 #endif
1595
1596   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1597   CpvAccess(procChkptBuf) = NULL;
1598
1599   notify_crash_fn = notify_crash;
1600
1601 #if ! CMK_CONVERSE_MPI
1602   // print pid to kill
1603 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1604 //  sleep(4);
1605 #endif
1606 #endif
1607 }
1608
1609
1610 extern "C"
1611 int CkHasCheckpoints()
1612 {
1613   return checkpointed;
1614 }
1615
1616 /// @todo: the following definitions should be moved to a separate file containing
1617 // structures and functions about fault tolerance strategies
1618
1619 /**
1620  *  * @brief: function for killing a process                                             
1621  *   */
1622 #ifdef CMK_MEM_CHECKPOINT
1623 #if CMK_HAS_GETPID
1624 void killLocal(void *_dummy,double curWallTime){
1625         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1626         if(CmiWallTimer()<killTime-1){
1627                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1628         }else{ 
1629 #if CMK_CONVERSE_MPI
1630                                 CkDieNow();
1631 #else 
1632                 kill(getpid(),SIGKILL);                                               
1633 #endif
1634         }              
1635
1636 #else
1637 void killLocal(void *_dummy,double curWallTime){
1638   CmiAbort("kill() not supported!");
1639 }
1640 #endif
1641 #endif
1642
1643 #ifdef CMK_MEM_CHECKPOINT
1644 /**
1645  * @brief: reads the file with the kill information
1646  */
1647 void readKillFile(){
1648         FILE *fp=fopen(killFile,"r");
1649         if(!fp){
1650                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1651                 return;
1652         }
1653         int proc;
1654         double sec;
1655         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1656                 if(proc == CkMyNode() && CkMyRank() == 0){
1657                         killTime = CmiWallTimer()+sec;
1658                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1659                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1660                 }
1661         }
1662         fclose(fp);
1663 }
1664
1665 #if ! CMK_CONVERSE_MPI
1666 void CkDieNow()
1667 {
1668          // ignored for non-mpi version
1669         CmiPrintf("[%d] die now.\n", CmiMyPe());
1670         killTime = CmiWallTimer()+0.001;
1671         CcdCallFnAfter(killLocal,NULL,1);
1672 }
1673 #endif
1674
1675 #endif
1676
1677 #include "CkMemCheckpoint.def.h"
1678
1679