streaming regarding changes
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         1
70 #define STREAMING_INFORMHOME                    0
71 CpvDeclare(int, _crashedNode);
72
73 // static, so that it is accessible from Converse part
74 int CkMemCheckPT::inRestarting = 0;
75 int CkMemCheckPT::inLoadbalancing = 0;
76 double CkMemCheckPT::startTime;
77 char *CkMemCheckPT::stage;
78 CkCallback CkMemCheckPT::cpCallback;
79
80 int _memChkptOn = 1;                    // checkpoint is on or off
81
82 CkGroupID ckCheckPTGroupID;             // readonly
83
84 static int checkpointed = 0;
85
86 /// @todo the following declarations should be moved into a separate file for all 
87 // fault tolerant strategies
88
89 #ifdef CMK_MEM_CHECKPOINT
90 // name of the kill file that contains processes to be killed 
91 char *killFile;                                               
92 // flag for the kill file         
93 int killFlag=0;
94 // variable for storing the killing time
95 double killTime=0.0;
96 #endif
97
98 #ifdef CKLOCMGR_LOOP
99 #undef CKLOCMGR_LOOP
100 #endif
101 // loop over all CkLocMgr and do "code"
102 #define  CKLOCMGR_LOOP(code)    {       \
103   int numGroups = CkpvAccess(_groupIDTable)->size();    \
104   for(int i=0;i<numGroups;i++) {        \
105     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
106     if(obj->isLocMgr())  {      \
107       CkLocMgr *mgr = (CkLocMgr*)obj;   \
108       code      \
109     }   \
110   }     \
111  }
112
113 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
114 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
115
116 // compute the backup processor
117 // FIXME: avoid crashed processors
118 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
119
120 inline int CkMemCheckPT::BuddyPE(int pe)
121 {
122   int budpe;
123 #if NODE_CHECKPOINT
124     // buddy is the processor with same rank on the next physical node
125   int r1 = CmiPhysicalRank(pe);
126   int budnode = CmiPhysicalNodeID(pe);
127   do {
128     budnode = (budnode+1)%CmiNumPhysicalNodes();
129     int *pelist;
130     int num;
131     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
132     budpe = pelist[r1 % num];
133   } while (isFailed(budpe));
134   if (budpe == pe) {
135     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
136     CmiAbort("Failed to find a buddy processor");
137   }
138 #else
139   budpe = pe;
140   while (budpe == pe || isFailed(budpe)) 
141           budpe = (budpe+1)%CkNumPes();
142 #endif
143   return budpe;
144 }
145
146 // called in array element constructor
147 // choose and register with 2 buddies for checkpoiting 
148 #if CMK_MEM_CHECKPOINT
149 void ArrayElement::init_checkpt() {
150         if (_memChkptOn == 0) return;
151         if (CkInRestarting()) {
152           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
153         }
154         // only master init checkpoint
155         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
156
157         budPEs[0] = CkMyPe();
158         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
159         CmiAssert(budPEs[0] != budPEs[1]);
160         // inform checkPTMgr
161         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
162         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
163         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
164         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
165 }
166 #endif
167
168 // entry function invoked by checkpoint mgr asking for checkpoint data
169 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
170 #if CMK_MEM_CHECKPOINT
171 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
172 //char index[128];   thisIndexMax.sprint(index);
173 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
174   CkLocMgr *locMgr = thisArray->getLocMgr();
175   CmiAssert(myRec!=NULL);
176   int size;
177   {
178         PUP::sizer p;
179         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
180         size = p.size();
181   }
182   int packSize = size/sizeof(double) +1;
183   CkArrayCheckPTMessage *msg =
184                  new (packSize, 0) CkArrayCheckPTMessage;
185   msg->len = size;
186   msg->index =thisIndexMax;
187   msg->aid = thisArrayID;
188   msg->locMgr = locMgr->getGroupID();
189   msg->cp_flag = 1;
190   {
191         PUP::toMem p(msg->packData);
192         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
193   }
194
195   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
196   checkptMgr.recvData(msg, 2, budPEs);
197   delete m;
198 #endif
199 }
200
201 // checkpoint holder class - for memory checkpointing
202 class CkMemCheckPTInfo: public CkCheckPTInfo
203 {
204   CkArrayCheckPTMessage *ckBuffer;
205 public:
206   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
207                     CkCheckPTInfo(a, loc, idx, pno)
208   {
209     ckBuffer = NULL;
210   }
211   ~CkMemCheckPTInfo() 
212   {
213     if (ckBuffer) delete ckBuffer; 
214   }
215   inline void updateBuffer(CkArrayCheckPTMessage *data) 
216   {
217     CmiAssert(data!=NULL);
218     if (ckBuffer) delete ckBuffer;
219     ckBuffer = data;
220   }    
221   inline CkArrayCheckPTMessage * getCopy()
222   {
223     if (ckBuffer == NULL) {
224       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
225       CmiAbort("Abort!");
226     }
227     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
228   }     
229   inline void updateBuddy(int b1, int b2) {
230      CmiAssert(ckBuffer);
231      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
232      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
233      CmiAssert(pNo != CkMyPe());
234   }
235   inline int getSize() { 
236      CmiAssert(ckBuffer);
237      return ckBuffer->len; 
238   }
239 };
240
241 // checkpoint holder class - for in-disk checkpointing
242 class CkDiskCheckPTInfo: public CkCheckPTInfo 
243 {
244   char *fname;
245   int bud1, bud2;
246   int len;                      // checkpoint size
247 public:
248   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
249   {
250 #if CMK_USE_MKSTEMP
251     fname = new char[64];
252     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
253     mkstemp(fname);
254 #else
255     fname=tmpnam(NULL);
256 #endif
257     bud1 = bud2 = -1;
258     len = 0;
259   }
260   ~CkDiskCheckPTInfo() 
261   {
262     remove(fname);
263   }
264   inline void updateBuffer(CkArrayCheckPTMessage *data) 
265   {
266     double t = CmiWallTimer();
267     // unpack it
268     envelope *env = UsrToEnv(data);
269     CkUnpackMessage(&env);
270     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
271     FILE *f = fopen(fname,"wb");
272     PUP::toDisk p(f);
273     CkPupMessage(p, (void **)&data);
274     // delay sync to the end because otherwise the messages are blocked
275 //    fsync(fileno(f));
276     fclose(f);
277     bud1 = data->bud1;
278     bud2 = data->bud2;
279     len = data->len;
280     delete data;
281     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
282   }
283   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
284   {
285     CkArrayCheckPTMessage *data;
286     FILE *f = fopen(fname,"rb");
287     PUP::fromDisk p(f);
288     CkPupMessage(p, (void **)&data);
289     fclose(f);
290     data->bud1 = bud1;                          // update the buddies
291     data->bud2 = bud2;
292     return data;
293   }
294   inline void updateBuddy(int b1, int b2) {
295      bud1 = b1; bud2 = b2;
296      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
297      CmiAssert(pNo != CkMyPe());
298   }
299   inline int getSize() { 
300      return len; 
301   }
302 };
303
304 CkMemCheckPT::CkMemCheckPT(int w)
305 {
306   int numnodes = 0;
307 #if NODE_CHECKPOINT
308   numnodes = CmiNumPhysicalNodes();
309 #else
310   numnodes = CkNumPes();
311 #endif
312 #if CK_NO_PROC_POOL
313   if (numnodes <= 2)
314 #else
315   if (numnodes  == 1)
316 #endif
317   {
318     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
319     _memChkptOn = 0;
320   }
321   inRestarting = 0;
322   recvCount = peCount = 0;
323   ackCount = 0;
324   expectCount = -1;
325   where = w;
326
327 #if CMK_CONVERSE_MPI
328   void pingBuddy();
329   void pingCheckHandler();
330   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
331   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
332 #endif
333 }
334
335 CkMemCheckPT::~CkMemCheckPT()
336 {
337   int len = ckTable.length();
338   for (int i=0; i<len; i++) {
339     delete ckTable[i];
340   }
341 }
342
343 void CkMemCheckPT::pup(PUP::er& p) 
344
345   CBase_CkMemCheckPT::pup(p); 
346   p|cpStarter;
347   p|thisFailedPe;
348   p|failedPes;
349   p|ckCheckPTGroupID;           // recover global variable
350   p|cpCallback;                 // store callback
351   p|where;                      // where to checkpoint
352   p|peCount;
353   if (p.isUnpacking()) {
354     recvCount = 0;
355 #if CMK_CONVERSE_MPI
356     void pingBuddy();
357     void pingCheckHandler();
358     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
359     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
360 #endif
361   }
362 }
363
364 // called by checkpoint mgr to restore an array element
365 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
366 {
367 #if CMK_MEM_CHECKPOINT
368   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
369   // m->index.print();
370   PUP::fromMem p(m->packData);
371   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
372   CmiAssert(mgr);
373 #if STREAMING_INFORMHOME
374   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
375 #else
376   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
377 #endif
378
379   // find a list of array elements bound together
380   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
381   CmiAssert(elt);
382   CkLocRec_local *rec = elt->myRec;
383   CkVec<CkMigratable *> list;
384   mgr->migratableList(rec, list);
385   CmiAssert(list.length() > 0);
386   for (int l=0; l<list.length(); l++) {
387     elt = (ArrayElement *)list[l];
388     elt->budPEs[0] = m->bud1;
389     elt->budPEs[1] = m->bud2;
390     //    reset, may not needed now
391     // for now.
392     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
393       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
394       if (c) c->redNo = 0;
395     }
396   }
397 #endif
398 }
399
400 // return 1 if pe is a crashed processor
401 int CkMemCheckPT::isFailed(int pe)
402 {
403   for (int i=0; i<failedPes.length(); i++)
404     if (failedPes[i] == pe) return 1;
405   return 0;
406 }
407
408 // add pe into history list of all failed processors
409 void CkMemCheckPT::failed(int pe)
410 {
411   if (isFailed(pe)) return;
412   failedPes.push_back(pe);
413 }
414
415 int CkMemCheckPT::totalFailed()
416 {
417   return failedPes.length();
418 }
419
420 // create an checkpoint entry for array element of aid with index.
421 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
422 {
423   // error check, no duplicate
424   int idx, len = ckTable.size();
425   for (idx=0; idx<len; idx++) {
426     CkCheckPTInfo *entry = ckTable[idx];
427     if (index == entry->index) {
428       if (loc == entry->locMgr) {
429           // bindTo array elements
430           return;
431       }
432         // for array inheritance, the following check may fail
433         // because ArrayElement constructor of all superclasses are called
434       if (aid == entry->aid) {
435         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
436         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
437       }
438     }
439   }
440   CkCheckPTInfo *newEntry;
441   if (where == CkCheckPoint_inMEM)
442     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
443   else
444     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
445   ckTable.push_back(newEntry);
446   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
447 }
448
449 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
450 {
451 #if !CMK_CHKP_ALL       
452   int buddy = msg->bud1;
453   if (buddy == CkMyPe()) buddy = msg->bud2;
454   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
455   recvData(msg);
456     // ack
457   thisProxy[buddy].gotData();
458 #else
459   recvArrayCheckpoint(msg);
460   thisProxy[msg->bud2].gotData();
461 #endif
462 }
463
464 // loop through my checkpoint table and ask checkpointed array elements
465 // to send me checkpoint data.
466 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
467 {
468   checkpointed = 1;
469   cpCallback = cb;
470   cpStarter = starter;
471   if (CkMyPe() == cpStarter) {
472     startTime = CmiWallTimer();
473     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
474   }
475 #if !CMK_CHKP_ALL
476   int len = ckTable.length();
477   for (int i=0; i<len; i++) {
478     CkCheckPTInfo *entry = ckTable[i];
479       // always let the bigger number processor send request
480     //if (CkMyPe() < entry->pNo) continue;
481       // always let the smaller number processor send request, may on same proc
482     if (!isMaster(entry->pNo)) continue;
483       // call inmem_checkpoint to the array element, ask it to send
484       // back checkpoint data via recvData().
485     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
486     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
487   }
488     // if my table is empty, then I am done
489   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
490 #else
491   startArrayCheckpoint();
492 #endif
493   // pack and send proc level data
494   sendProcData();
495 }
496
497 class MemElementPacker : public CkLocIterator{
498         private:
499                 CkLocMgr *locMgr;
500                 PUP::er &p;
501         public:
502                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
503                 void addLocation(CkLocation &loc){
504                         CkArrayIndexMax idx = loc.getIndex();
505                         CkGroupID gID = locMgr->ckGetGroupID();
506                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
507                         CmiAssert(elt);
508                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
509                         p|gID;
510                         p|idx;
511                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
512                 }
513 };
514
515 void CkMemCheckPT::pupAllElements(PUP::er &p){
516 #if CMK_CHKP_ALL
517         int numElements;
518         if(!p.isUnpacking()){
519                 numElements = CkCountArrayElements();
520         }
521         p | numElements;
522         if(!p.isUnpacking()){
523                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
524         }
525 #endif
526 }
527
528 void CkMemCheckPT::startArrayCheckpoint(){
529 #if CMK_CHKP_ALL
530         int size;
531         {
532                 PUP::sizer psizer;
533                 pupAllElements(psizer);
534                 size = psizer.size();
535         }
536         int packSize = size/sizeof(double)+1;
537         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
538         msg->len = size;
539         msg->cp_flag = 1;
540         int budPEs[2];
541         msg->bud1=CkMyPe();
542         msg->bud2=ChkptOnPe(CkMyPe());
543         {
544                 PUP::toMem p(msg->packData);
545                 pupAllElements(p);
546         }
547         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
548         chkpTable[0] = msg;
549         recvCount++;
550 #endif
551 }
552
553 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
554 {
555 #if CMK_CHKP_ALL
556         int idx = 1;
557         if(msg->bud1 == CkMyPe()){
558                 idx = 0;
559         }
560         int isChkpting = msg->cp_flag;
561         chkpTable[idx] = msg;
562         if(isChkpting){
563                 recvCount++;
564                 if(recvCount == 2){
565                   if (where == CkCheckPoint_inMEM) {
566                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
567                   }
568                   else if (where == CkCheckPoint_inDISK) {
569                         // another barrier for finalize the writing using fsync
570                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
571                         contribute(0,NULL,CkReduction::sum_int,localcb);
572                   }
573                   else
574                         CmiAbort("Unknown checkpoint scheme");
575                   recvCount = 0;
576                 }
577         }
578 #endif
579 }
580
581 // don't handle array elements
582 static inline void _handleProcData(PUP::er &p)
583 {
584     // save readonlys, and callback BTW
585     CkPupROData(p);
586
587     // save mainchares 
588     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
589         
590 #ifndef CMK_CHARE_USE_PTR
591     // save non-migratable chare
592     CkPupChareData(p);
593 #endif
594
595     // save groups into Groups.dat
596     CkPupGroupData(p);
597
598     // save nodegroups into NodeGroups.dat
599     if(CkMyRank()==0) CkPupNodeGroupData(p);
600 }
601
602 void CkMemCheckPT::sendProcData()
603 {
604   // find out size of buffer
605   int size;
606   {
607     PUP::sizer p;
608     _handleProcData(p);
609     size = p.size();
610   }
611   int packSize = size;
612   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
613   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
614   {
615     PUP::toMem p(msg->packData);
616     _handleProcData(p);
617   }
618   msg->pe = CkMyPe();
619   msg->len = size;
620   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
621   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
622 }
623
624 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
625 {
626   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
627   CpvAccess(procChkptBuf) = msg;
628   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
629   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
630 }
631
632 // ArrayElement call this function to give us the checkpointed data
633 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
634 {
635   int len = ckTable.length();
636   int idx;
637   for (idx=0; idx<len; idx++) {
638     CkCheckPTInfo *entry = ckTable[idx];
639     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
640   }
641   CkAssert(idx < len);
642   int isChkpting = msg->cp_flag;
643   ckTable[idx]->updateBuffer(msg);
644   if (isChkpting) {
645       // all my array elements have returned their inmem data
646       // inform starter processor that I am done.
647     recvCount ++;
648     if (recvCount == ckTable.length()) {
649       if (where == CkCheckPoint_inMEM) {
650         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
651       }
652       else if (where == CkCheckPoint_inDISK) {
653         // another barrier for finalize the writing using fsync
654         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
655         contribute(0,NULL,CkReduction::sum_int,localcb);
656       }
657       else
658         CmiAbort("Unknown checkpoint scheme");
659       recvCount = 0;
660     } 
661   }
662 }
663
664 // only used in disk checkpointing
665 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
666 {
667   delete m;
668 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
669   system("sync");
670 #endif
671   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
672 }
673
674 // only is called on cpStarter when checkpoint is done
675 void CkMemCheckPT::cpFinish()
676 {
677   CmiAssert(CkMyPe() == cpStarter);
678   peCount++;
679     // now that all processors have finished, activate callback
680   if (peCount == 2) {
681     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
682     cpCallback.send();
683     peCount = 0;
684 #if !CMK_CHKP_ALL
685     thisProxy.report();
686 #endif
687   }
688 }
689
690 // for debugging, report checkpoint info
691 void CkMemCheckPT::report()
692 {
693   int objsize = 0;
694   int len = ckTable.length();
695   for (int i=0; i<len; i++) {
696     CkCheckPTInfo *entry = ckTable[i];
697     CmiAssert(entry);
698     objsize += entry->getSize();
699   }
700   CmiAssert(CpvAccess(procChkptBuf));
701 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
702 }
703
704 /*****************************************************************************
705                         RESTART Procedure
706 *****************************************************************************/
707
708 // master processor of two buddies
709 inline int CkMemCheckPT::isMaster(int buddype)
710 {
711 #if 0
712   int mype = CkMyPe();
713 //CkPrintf("ismaster: %d %d\n", pe, mype);
714   if (CkNumPes() - totalFailed() == 2) {
715     return mype > buddype;
716   }
717   for (int i=1; i<CkNumPes(); i++) {
718     int me = (buddype+i)%CkNumPes();
719     if (isFailed(me)) continue;
720     if (me == mype) return 1;
721     else return 0;
722   }
723   return 0;
724 #else
725     // smaller one
726   int mype = CkMyPe();
727 //CkPrintf("ismaster: %d %d\n", pe, mype);
728   if (CkNumPes() - totalFailed() == 2) {
729     return mype < buddype;
730   }
731 #if NODE_CHECKPOINT
732   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
733   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
734 #else
735   for (int i=1; i<CkNumPes(); i++) {
736 #endif
737     int me = (mype+i)%CkNumPes();
738     if (isFailed(me)) continue;
739     if (me == buddype) return 1;
740     else return 0;
741   }
742   return 0;
743 #endif
744 }
745
746
747
748 #if 0
749 // helper class to pup all elements that belong to same ckLocMgr
750 class ElementDestoryer : public CkLocIterator {
751 private:
752         CkLocMgr *locMgr;
753 public:
754         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
755         void addLocation(CkLocation &loc) {
756                 CkArrayIndex idx=loc.getIndex();
757                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
758                 loc.destroy();
759         }
760 };
761 #endif
762
763 // restore the bitmap vector for LB
764 void CkMemCheckPT::resetLB(int diepe)
765 {
766 #if CMK_LBDB_ON
767   int i;
768   char *bitmap = new char[CkNumPes()];
769   // set processor available bitmap
770   get_avail_vector(bitmap);
771
772   for (i=0; i<failedPes.length(); i++)
773     bitmap[failedPes[i]] = 0; 
774   bitmap[diepe] = 0;
775
776 #if CK_NO_PROC_POOL
777   set_avail_vector(bitmap);
778 #endif
779
780   // if I am the crashed pe, rebuild my failedPEs array
781   if (CkMyNode() == diepe)
782     for (i=0; i<CkNumPes(); i++) 
783       if (bitmap[i]==0) failed(i);
784
785   delete [] bitmap;
786 #endif
787 }
788
789 // in case when failedPe dies, everybody go through its checkpoint table:
790 // destory all array elements
791 // recover lost buddies
792 // reconstruct all array elements from check point data
793 // called on all processors
794 void CkMemCheckPT::restart(int diePe)
795 {
796 #if CMK_MEM_CHECKPOINT
797   double curTime = CmiWallTimer();
798   if (CkMyPe() == diePe)
799     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
800   stage = (char*)"resetLB";
801   startTime = curTime;
802   if (CkMyPe() == diePe)
803     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
804
805 #if CK_NO_PROC_POOL
806   failed(diePe);        // add into the list of failed pes
807 #endif
808   thisFailedPe = diePe;
809
810   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
811
812   inRestarting = 1;
813                                                                                 
814   // disable load balancer's barrier
815   if (CkMyPe() != diePe) resetLB(diePe);
816
817   CKLOCMGR_LOOP(mgr->startInserting(););
818
819   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
820   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
821 /*
822   if (CkMyPe() == 0)
823     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
824 */
825 #endif
826 }
827
828 // loally remove all array elements
829 void CkMemCheckPT::removeArrayElements()
830 {
831 #if CMK_MEM_CHECKPOINT
832   int len = ckTable.length();
833   double curTime = CmiWallTimer();
834   if (CkMyPe() == thisFailedPe) 
835     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
836   stage = (char*)"removeArrayElements";
837   startTime = curTime;
838
839   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
840   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
841
842   // get rid of all buffering and remote recs
843   // including destorying all array elements
844   CKLOCMGR_LOOP(mgr->flushAllRecs(););
845
846 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
847
848   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
849   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
850 #endif
851 }
852
853 // flush state in reduction manager
854 void CkMemCheckPT::resetReductionMgr()
855 {
856   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
857   int numGroups = CkpvAccess(_groupIDTable)->size();
858   for(int i=0;i<numGroups;i++) {
859     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
860     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
861     obj->flushStates();
862     obj->ckJustMigrated();
863   }
864   // reset again
865   //CpvAccess(_qd)->flushStates();
866
867 #if 1
868   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
869   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
870 #else
871   if (CkMyPe() == 0)
872     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
873 #endif
874 }
875
876 // recover the lost buddies
877 void CkMemCheckPT::recoverBuddies()
878 {
879   int idx;
880   int len = ckTable.length();
881   // ready to flush reduction manager
882   // cannot be CkMemCheckPT::restart because destory will modify states
883   double curTime = CmiWallTimer();
884   if (CkMyPe() == thisFailedPe)
885   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
886   stage = (char *)"recoverBuddies";
887   if (CkMyPe() == thisFailedPe)
888   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
889   startTime = curTime;
890
891   // recover buddies
892   expectCount = 0;
893 #if !CMK_CHKP_ALL
894   for (idx=0; idx<len; idx++) {
895     CkCheckPTInfo *entry = ckTable[idx];
896     if (entry->pNo == thisFailedPe) {
897 #if CK_NO_PROC_POOL
898       // find a new buddy
899       int budPe = BuddyPE(CkMyPe());
900 #else
901       int budPe = thisFailedPe;
902 #endif
903       CkArrayCheckPTMessage *msg = entry->getCopy();
904       msg->bud1 = budPe;
905       msg->bud2 = CkMyPe();
906       msg->cp_flag = 0;            // not checkpointing
907       thisProxy[budPe].recoverEntry(msg);
908       expectCount ++;
909     }
910   }
911 #else
912   //send to failed pe
913   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
914 #if CK_NO_PROC_POOL
915       // find a new buddy
916       int budPe = BuddyPE(CkMyPe());
917 #else
918       int budPe = thisFailedPe;
919 #endif
920       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
921       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
922           msg->cp_flag = 0;            // not checkpointing
923       msg->bud1 = budPe;
924       msg->bud2 = CkMyPe();
925       thisProxy[budPe].recoverEntry(msg);
926       expectCount ++;
927   }
928 #endif
929
930 #if 1
931   if (expectCount == 0) {
932     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
933     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
934   }
935 #else
936   if (CkMyPe() == 0) {
937     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
938   }
939 #endif
940
941   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
942 }
943
944 void CkMemCheckPT::gotData()
945 {
946   ackCount ++;
947   if (ackCount == expectCount) {
948     ackCount = 0;
949     expectCount = -1;
950     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
951     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
952   }
953 }
954
955 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
956 {
957   for (int i=0; i<n; i++) {
958     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
959     mgr->updateLocation(idx[i], nowOnPe);
960   }
961 }
962
963 #include "dcmf.h"
964 // restore array elements
965 void CkMemCheckPT::recoverArrayElements()
966 {
967   double curTime = CmiWallTimer();
968   int len = ckTable.length();
969   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
970   stage = (char *)"recoverArrayElements";
971   if (CkMyPe() == thisFailedPe)
972   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
973   startTime = curTime;
974  int flag = 0;
975   // recover all array elements
976   int count = 0;
977 //#if STREAMING_INFORMHOME
978   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
979   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
980 //#endif
981 #if !CMK_CHKP_ALL
982   for (int idx=0; idx<len; idx++)
983   {
984     CkCheckPTInfo *entry = ckTable[idx];
985 #if CK_NO_PROC_POOL
986     // the bigger one will do 
987 //    if (CkMyPe() < entry->pNo) continue;
988     if (!isMaster(entry->pNo)) continue;
989 #else
990     // smaller one do it, which has the original object
991     if (CkMyPe() == entry->pNo+1 || 
992         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
993 #endif
994 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
995
996     entry->updateBuddy(CkMyPe(), entry->pNo);
997     CkArrayCheckPTMessage *msg = entry->getCopy();
998     // gzheng
999     //thisProxy[CkMyPe()].inmem_restore(msg);
1000     inmem_restore(msg);
1001 #if STREAMING_INFORMHOME
1002     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1003     int homePe = mgr->homePe(msg->index);
1004     if (homePe != CkMyPe()) {
1005       gmap[homePe].push_back(msg->locMgr);
1006       imap[homePe].push_back(msg->index);
1007     }
1008 #endif
1009     CkFreeMsg(msg);
1010     count ++;
1011   }
1012 #else
1013         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1014         recoverAll(msg,gmap,imap);
1015     CkFreeMsg(msg);
1016 #endif
1017   curTime = CmiWallTimer();
1018   if (CkMyPe() == thisFailedPe)
1019         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1020 #if STREAMING_INFORMHOME
1021   for (int i=0; i<CkNumPes(); i++) {
1022     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1023       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1024         CkPrintf("[%d] send to %d at %lf\n",CkMyPe(),i,DCMF_Timer());
1025         flag++; 
1026         }
1027   }
1028 #endif
1029   delete [] imap;
1030   delete [] gmap;
1031   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1032
1033   CKLOCMGR_LOOP(mgr->doneInserting(););
1034
1035   // _crashedNode = -1;
1036   CpvAccess(_crashedNode) = -1;
1037 if(CkMyPe()!=thisFailedPe)
1038   inRestarting = 0;
1039  // if (CkMyPe() == 0)
1040    // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1041 if(flag == 0)
1042 {
1043     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1044 }
1045 }
1046
1047 void CkMemCheckPT::gotReply(){
1048         CkPrintf("[%d] got reply at %lf\n",CkMyPe(),DCMF_Timer());
1049     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1050 }
1051
1052 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1053 #if CMK_CHKP_ALL
1054         PUP::fromMem p(msg->packData);
1055         int numElements;
1056         p|numElements;
1057         if(p.isUnpacking()){
1058                 for(int i=0;i<numElements;i++){
1059                         CkGroupID gID;
1060                         CkArrayIndex idx;
1061                         p|gID;
1062                         p|idx;
1063                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1064                         int homePe = mgr->homePe(idx);
1065 #if STREAMING_INFORMHOME
1066                         mgr->resume(idx,p,CmiFalse);
1067 #else
1068                         if(homePe == thisFailedPe && homePe!=CkMyPe())
1069                                 mgr->resume(idx,p,CmiTrue);
1070                         else
1071                                 mgr->resume(idx,p,CmiFalse);
1072 #endif
1073                           /*CkLocRec_local *rec = loc.getLocalRecord();
1074                           CmiAssert(rec);
1075                           CkVec<CkMigratable *> list;
1076                           mgr->migratableList(rec, list);
1077                           CmiAssert(list.length() > 0);
1078                           for (int l=0; l<list.length(); l++) {
1079                                 ArrayElement * elt = (ArrayElement *)list[l];
1080                                 //    reset, may not needed now
1081                                 // for now.
1082                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1083                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1084                                   if (c) c->redNo = 0;
1085                                 }
1086                           }*/
1087 #if STREAMING_INFORMHOME
1088                         int homePe = mgr->homePe(idx);
1089                         if (homePe != CkMyPe()) {
1090                           gmap[homePe].push_back(gID);
1091                           imap[homePe].push_back(idx);
1092                         }
1093 #endif
1094                 }
1095         }
1096 #endif
1097 }
1098
1099 static double restartT;
1100
1101 // on every processor
1102 // turn load balancer back on
1103 void CkMemCheckPT::finishUp()
1104 {
1105   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1106   //CKLOCMGR_LOOP(mgr->doneInserting(););
1107   
1108   if (CkMyPe() == thisFailedPe)
1109   {
1110         inRestarting=0;
1111        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1112        //CkStartQD(cpCallback);
1113        cpCallback.send();
1114        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1115   }
1116
1117 #if CK_NO_PROC_POOL
1118 #if NODE_CHECKPOINT
1119   int numnodes = CmiNumPhysicalNodes();
1120 #else
1121   int numnodes = CkNumPes();
1122 #endif
1123   if (numnodes-totalFailed() <=2) {
1124     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1125     _memChkptOn = 0;
1126   }
1127 #endif
1128 }
1129
1130 // called only on 0
1131 void CkMemCheckPT::quiescence(CkCallback &cb)
1132 {
1133   static int pe_count = 0;
1134   pe_count ++;
1135   CmiAssert(CkMyPe() == 0);
1136   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1137   if (pe_count == CkNumPes()) {
1138     pe_count = 0;
1139     cb.send();
1140   }
1141 }
1142
1143 // User callable function - to start a checkpoint
1144 // callback cb is used to pass control back
1145 void CkStartMemCheckpoint(CkCallback &cb)
1146 {
1147 #if CMK_MEM_CHECKPOINT
1148   if (_memChkptOn == 0) {
1149     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1150     cb.send();
1151     return;
1152   }
1153   if (CkInRestarting()) {
1154       // trying to checkpointing during restart
1155     cb.send();
1156     return;
1157   }
1158     // store user callback and user data
1159   CkMemCheckPT::cpCallback = cb;
1160
1161     // broadcast to start check pointing
1162   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1163   checkptMgr.doItNow(CkMyPe(), cb);
1164 #else
1165   // when mem checkpoint is disabled, invike cb immediately
1166   cb.send();
1167 #endif
1168 }
1169
1170 void CkRestartCheckPoint(int diePe)
1171 {
1172 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1173   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1174   // broadcast
1175   checkptMgr.restart(diePe);
1176 }
1177
1178 static int _diePE = -1;
1179
1180 // callback function used locally by ccs handler
1181 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1182 {
1183 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1184   CkRestartCheckPoint(_diePE);
1185 }
1186
1187 // Converse function handles
1188 static int askPhaseHandlerIdx;
1189 static int recvPhaseHandlerIdx;
1190 static int askProcDataHandlerIdx;
1191 static int restartBcastHandlerIdx;
1192 static int recoverProcDataHandlerIdx;
1193 static int restartBeginHandlerIdx;
1194 static int notifyHandlerIdx;
1195
1196 // called on crashed PE
1197 static void restartBeginHandler(char *msg)
1198 {
1199   CmiFree(msg);
1200 #if CMK_MEM_CHECKPOINT
1201 #if CMK_USE_BARRIER
1202         if(CkMyPe()!=_diePE){
1203                 printf("restar begin on %d\n",CkMyPe());
1204                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1205                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1206                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1207         }else{
1208         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1209         CkRestartCheckPointCallback(NULL, NULL);
1210         }
1211 #else
1212   static int count = 0;
1213   CmiAssert(CkMyPe() == _diePE);
1214   count ++;
1215   if (count == CkNumPes()) {
1216     CkRestartCheckPointCallback(NULL, NULL);
1217     count = 0;
1218   }
1219 #endif
1220 #endif
1221 }
1222
1223 extern void _discard_charm_message();
1224 extern void _resume_charm_message();
1225
1226 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1227         return data;
1228 }
1229
1230 static void restartBcastHandler(char *msg)
1231 {
1232 #if CMK_MEM_CHECKPOINT
1233   // advance phase counter
1234   CkMemCheckPT::inRestarting = 1;
1235   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1236   // gzheng
1237   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1238
1239   if (CkMyPe()==_diePE)
1240     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1241
1242   // reset QD counters
1243 /*  gzheng
1244   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1245 */
1246
1247 /*  gzheng
1248   if (CkMyPe()==_diePE)
1249       CkRestartCheckPointCallback(NULL, NULL);
1250 */
1251   CmiFree(msg);
1252
1253   _resume_charm_message();
1254
1255     // reduction
1256   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1257   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1258 #if CMK_USE_BARRIER
1259         //CmiPrintf("before reduce\n"); 
1260         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1261         //CmiPrintf("after reduce\n");  
1262 #else
1263   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1264 #endif 
1265  checkpointed = 0;
1266 #endif
1267 }
1268
1269 extern void _initDone();
1270
1271 // called on crashed processor
1272 static void recoverProcDataHandler(char *msg)
1273 {
1274 #if CMK_MEM_CHECKPOINT
1275    int i;
1276    envelope *env = (envelope *)msg;
1277    CkUnpackMessage(&env);
1278    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1279    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1280    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1281    //cur_restart_phase ++;
1282      // gzheng ?
1283    //CpvAccess(_qd)->flushStates();
1284
1285    // restore readonly, mainchare, group, nodegroup
1286 //   int temp = cur_restart_phase;
1287 //   cur_restart_phase = -1;
1288    PUP::fromMem p(procMsg->packData);
1289    _handleProcData(p);
1290
1291    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1292    // gzheng
1293    CKLOCMGR_LOOP(mgr->startInserting(););
1294
1295    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1296    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1297    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1298    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1299
1300    _initDone();
1301 //   CpvAccess(_qd)->flushStates();
1302    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1303 #endif
1304 }
1305
1306 // called on its backup processor
1307 // get backup message buffer and sent to crashed processor
1308 static void askProcDataHandler(char *msg)
1309 {
1310 #if CMK_MEM_CHECKPOINT
1311     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1312     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1313     if (CpvAccess(procChkptBuf) == NULL)  {
1314       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1315       CkAbort("no checkpoint found");
1316     }
1317     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1318     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1319
1320     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1321
1322     CkPackMessage(&env);
1323     CmiSetHandler(env, recoverProcDataHandlerIdx);
1324     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1325     CpvAccess(procChkptBuf) = NULL;
1326 #endif
1327 }
1328
1329 // called on PE 0
1330 void qd_callback(void *m)
1331 {
1332    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1333    CkFreeMsg(m);
1334 #ifdef CMK_SMP
1335    for(int i=0;i<CmiMyNodeSize();i++){
1336    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1337    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1338         CmiSetHandler(msg, askProcDataHandlerIdx);
1339         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1340         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1341    }
1342    return;
1343 #endif
1344    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1345    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1346    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1347    CmiSetHandler(msg, askProcDataHandlerIdx);
1348    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1349    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1350
1351 }
1352
1353 // on crashed node
1354 void CkMemRestart(const char *dummy, CkArgMsg *args)
1355 {
1356 #if CMK_MEM_CHECKPOINT
1357    _diePE = CmiMyNode();
1358    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1359    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1360    CkMemCheckPT::inRestarting = 1;
1361
1362   CpvAccess( _crashedNode )= CmiMyNode();
1363         
1364   _discard_charm_message();
1365   
1366   if(CmiMyRank()==0){
1367     CkCallback cb(qd_callback);
1368     CkStartQD(cb);
1369     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1370   }
1371 #else
1372    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1373 #endif
1374 }
1375
1376 // can be called in other files
1377 // return true if it is in restarting
1378 extern "C"
1379 int CkInRestarting()
1380 {
1381 #if CMK_MEM_CHECKPOINT
1382   if (CpvAccess( _crashedNode)!=-1) return 1;
1383   // gzheng
1384   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1385   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1386   return CkMemCheckPT::inRestarting;
1387 #else
1388   return 0;
1389 #endif
1390 }
1391
1392 extern "C"
1393 void CkSetInLdb(){
1394 #if CMK_MEM_CHECKPOINT
1395         CkMemCheckPT::inLoadbalancing = 1;
1396 #endif
1397 }
1398
1399 extern "C"
1400 int CkInLdb(){
1401 #if CMK_MEM_CHECKPOINT
1402         return CkMemCheckPT::inLoadbalancing;
1403 #endif
1404 }
1405
1406 extern "C"
1407 void CkResetInLdb(){
1408 #if CMK_MEM_CHECKPOINT
1409         CkMemCheckPT::inLoadbalancing = 0;
1410 #endif
1411 }
1412
1413 /*****************************************************************************
1414                 module initialization
1415 *****************************************************************************/
1416
1417 static int arg_where = CkCheckPoint_inMEM;
1418
1419 #if CMK_MEM_CHECKPOINT
1420 void init_memcheckpt(char **argv)
1421 {
1422     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1423       arg_where = CkCheckPoint_inDISK;
1424     }
1425
1426         // initiliazing _crashedNode variable
1427         CpvInitialize(int, _crashedNode);
1428         CpvAccess(_crashedNode) = -1;
1429
1430 }
1431 #endif
1432
1433 class CkMemCheckPTInit: public Chare {
1434 public:
1435   CkMemCheckPTInit(CkArgMsg *m) {
1436 #if CMK_MEM_CHECKPOINT
1437     if (arg_where == CkCheckPoint_inDISK) {
1438       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1439     }
1440     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1441     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1442 #endif
1443   }
1444 };
1445
1446 static void notifyHandler(char *msg)
1447 {
1448 #if CMK_MEM_CHECKPOINT
1449   CmiFree(msg);
1450       /* immediately increase restart phase to filter old messages */
1451   CpvAccess(_curRestartPhase) ++;
1452   CpvAccess(_qd)->flushStates();
1453   _discard_charm_message();
1454 #endif
1455 }
1456
1457 extern "C"
1458 void notify_crash(int node)
1459 {
1460 #ifdef CMK_MEM_CHECKPOINT
1461   CpvAccess( _crashedNode) = node;
1462 #ifdef CMK_SMP
1463   for(int i=0;i<CkMyNodeSize();i++){
1464         CpvAccessOther(_crashedNode,i)=node;
1465   }
1466 #endif
1467   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1468   CkMemCheckPT::inRestarting = 1;
1469
1470     // this may be in interrupt handler, send a message to reset QD
1471   int pe = CmiNodeFirst(CkMyNode());
1472   for(int i=0;i<CkMyNodeSize();i++){
1473         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1474         CmiSetHandler(msg, notifyHandlerIdx);
1475         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1476   }
1477 #endif
1478 }
1479
1480 extern "C" void (*notify_crash_fn)(int node);
1481
1482 #if CMK_CONVERSE_MPI
1483 static int pingHandlerIdx;
1484 static int pingCheckHandlerIdx;
1485 static int buddyDieHandlerIdx;
1486 static double lastPingTime = -1;
1487
1488 extern "C" void mpi_restart_crashed(int pe, int rank);
1489 extern "C" int  find_spare_mpirank(int pe);
1490
1491 void pingBuddy();
1492 void pingCheckHandler();
1493
1494 void buddyDieHandler(char *msg)
1495 {
1496 #if CMK_MEM_CHECKPOINT
1497    // notify
1498    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1499    notify_crash(diepe);
1500    // send message to crash pe to let it restart
1501    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1502    int newrank = find_spare_mpirank(diepe);
1503    int buddy = obj->BuddyPE(CmiMyPe());
1504    if (buddy == diepe)  {
1505      mpi_restart_crashed(diepe, newrank);
1506      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1507    }
1508 #endif
1509 }
1510
1511 void pingHandler(void *msg)
1512 {
1513   lastPingTime = CmiWallTimer();
1514   CmiFree(msg);
1515 }
1516
1517 void pingCheckHandler()
1518 {
1519 #if CMK_MEM_CHECKPOINT
1520   double now = CmiWallTimer();
1521   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1522     int i, pe, buddy;
1523     // tell everyone the buddy dies
1524     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1525     for (i = 1; i < CmiNumPes(); i++) {
1526        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1527        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1528     }
1529     buddy = pe;
1530     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1531     for (int pe = 0; pe < CmiNumPes(); pe++) {
1532       if (obj->isFailed(pe) || pe == buddy) continue;
1533       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1534       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1535       CmiSetHandler(msg, buddyDieHandlerIdx);
1536       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1537     }
1538   }
1539   else 
1540     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1541 #endif
1542 }
1543
1544 void pingBuddy()
1545 {
1546 #if CMK_MEM_CHECKPOINT
1547   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1548   if (obj) {
1549     int buddy = obj->BuddyPE(CkMyPe());
1550 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1551     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1552     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1553     CmiSetHandler(msg, pingHandlerIdx);
1554     CmiGetRestartPhase(msg) = 9999;
1555     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1556   }
1557   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1558 #endif
1559 }
1560 #endif
1561
1562 // initproc
1563 void CkRegisterRestartHandler( )
1564 {
1565 #if CMK_MEM_CHECKPOINT
1566   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1567   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1568   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1569   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1570   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1571
1572 #if CMK_CONVERSE_MPI
1573   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1574   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1575   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1576 #endif
1577
1578   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1579   CpvAccess(procChkptBuf) = NULL;
1580
1581   notify_crash_fn = notify_crash;
1582
1583 #if ! CMK_CONVERSE_MPI
1584   // print pid to kill
1585 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1586 //  sleep(4);
1587 #endif
1588 #endif
1589 }
1590
1591
1592 extern "C"
1593 int CkHasCheckpoints()
1594 {
1595   return checkpointed;
1596 }
1597
1598 /// @todo: the following definitions should be moved to a separate file containing
1599 // structures and functions about fault tolerance strategies
1600
1601 /**
1602  *  * @brief: function for killing a process                                             
1603  *   */
1604 #ifdef CMK_MEM_CHECKPOINT
1605 #if CMK_HAS_GETPID
1606 void killLocal(void *_dummy,double curWallTime){
1607         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1608         if(CmiWallTimer()<killTime-1){
1609                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1610         }else{ 
1611 #if CMK_CONVERSE_MPI
1612                                 CkDieNow();
1613 #else 
1614                 kill(getpid(),SIGKILL);                                               
1615 #endif
1616         }              
1617
1618 #else
1619 void killLocal(void *_dummy,double curWallTime){
1620   CmiAbort("kill() not supported!");
1621 }
1622 #endif
1623 #endif
1624
1625 #ifdef CMK_MEM_CHECKPOINT
1626 /**
1627  * @brief: reads the file with the kill information
1628  */
1629 void readKillFile(){
1630         FILE *fp=fopen(killFile,"r");
1631         if(!fp){
1632                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1633                 return;
1634         }
1635         int proc;
1636         double sec;
1637         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1638                 if(proc == CkMyNode() && CkMyRank() == 0){
1639                         killTime = CmiWallTimer()+sec;
1640                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1641                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1642                 }
1643         }
1644         fclose(fp);
1645 }
1646
1647 #if ! CMK_CONVERSE_MPI
1648 void CkDieNow()
1649 {
1650          // ignored for non-mpi version
1651         CmiPrintf("[%d] die now.\n", CmiMyPe());
1652         killTime = CmiWallTimer()+0.001;
1653         CcdCallFnAfter(killLocal,NULL,1);
1654 }
1655 #endif
1656
1657 #endif
1658
1659 #include "CkMemCheckpoint.def.h"
1660
1661