more optimization
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         1
70 #define STREAMING_INFORMHOME                    1
71 CpvDeclare(int, _crashedNode);
72
73 // static, so that it is accessible from Converse part
74 int CkMemCheckPT::inRestarting = 0;
75 int CkMemCheckPT::inLoadbalancing = 0;
76 double CkMemCheckPT::startTime;
77 char *CkMemCheckPT::stage;
78 CkCallback CkMemCheckPT::cpCallback;
79
80 int _memChkptOn = 1;                    // checkpoint is on or off
81
82 CkGroupID ckCheckPTGroupID;             // readonly
83
84 static int checkpointed = 0;
85
86 /// @todo the following declarations should be moved into a separate file for all 
87 // fault tolerant strategies
88
89 #ifdef CMK_MEM_CHECKPOINT
90 // name of the kill file that contains processes to be killed 
91 char *killFile;                                               
92 // flag for the kill file         
93 int killFlag=0;
94 // variable for storing the killing time
95 double killTime=0.0;
96 #endif
97
98 #ifdef CKLOCMGR_LOOP
99 #undef CKLOCMGR_LOOP
100 #endif
101 // loop over all CkLocMgr and do "code"
102 #define  CKLOCMGR_LOOP(code)    {       \
103   int numGroups = CkpvAccess(_groupIDTable)->size();    \
104   for(int i=0;i<numGroups;i++) {        \
105     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
106     if(obj->isLocMgr())  {      \
107       CkLocMgr *mgr = (CkLocMgr*)obj;   \
108       code      \
109     }   \
110   }     \
111  }
112
113 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
114 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
115
116 // compute the backup processor
117 // FIXME: avoid crashed processors
118 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
119
120 inline int CkMemCheckPT::BuddyPE(int pe)
121 {
122   int budpe;
123 #if NODE_CHECKPOINT
124     // buddy is the processor with same rank on the next physical node
125   int r1 = CmiPhysicalRank(pe);
126   int budnode = CmiPhysicalNodeID(pe);
127   do {
128     budnode = (budnode+1)%CmiNumPhysicalNodes();
129     int *pelist;
130     int num;
131     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
132     budpe = pelist[r1 % num];
133   } while (isFailed(budpe));
134   if (budpe == pe) {
135     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
136     CmiAbort("Failed to find a buddy processor");
137   }
138 #else
139   budpe = pe;
140   while (budpe == pe || isFailed(budpe)) 
141           budpe = (budpe+1)%CkNumPes();
142 #endif
143   return budpe;
144 }
145
146 // called in array element constructor
147 // choose and register with 2 buddies for checkpoiting 
148 #if CMK_MEM_CHECKPOINT
149 void ArrayElement::init_checkpt() {
150         if (_memChkptOn == 0) return;
151         if (CkInRestarting()) {
152           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
153         }
154         // only master init checkpoint
155         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
156
157         budPEs[0] = CkMyPe();
158         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
159         CmiAssert(budPEs[0] != budPEs[1]);
160         // inform checkPTMgr
161         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
162         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
163         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
164         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
165 }
166 #endif
167
168 // entry function invoked by checkpoint mgr asking for checkpoint data
169 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
170 #if CMK_MEM_CHECKPOINT
171 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
172 //char index[128];   thisIndexMax.sprint(index);
173 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
174   CkLocMgr *locMgr = thisArray->getLocMgr();
175   CmiAssert(myRec!=NULL);
176   int size;
177   {
178         PUP::sizer p;
179         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
180         size = p.size();
181   }
182   int packSize = size/sizeof(double) +1;
183   CkArrayCheckPTMessage *msg =
184                  new (packSize, 0) CkArrayCheckPTMessage;
185   msg->len = size;
186   msg->index =thisIndexMax;
187   msg->aid = thisArrayID;
188   msg->locMgr = locMgr->getGroupID();
189   msg->cp_flag = 1;
190   {
191         PUP::toMem p(msg->packData);
192         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
193   }
194
195   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
196   checkptMgr.recvData(msg, 2, budPEs);
197   delete m;
198 #endif
199 }
200
201 // checkpoint holder class - for memory checkpointing
202 class CkMemCheckPTInfo: public CkCheckPTInfo
203 {
204   CkArrayCheckPTMessage *ckBuffer;
205 public:
206   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
207                     CkCheckPTInfo(a, loc, idx, pno)
208   {
209     ckBuffer = NULL;
210   }
211   ~CkMemCheckPTInfo() 
212   {
213     if (ckBuffer) delete ckBuffer; 
214   }
215   inline void updateBuffer(CkArrayCheckPTMessage *data) 
216   {
217     CmiAssert(data!=NULL);
218     if (ckBuffer) delete ckBuffer;
219     ckBuffer = data;
220   }    
221   inline CkArrayCheckPTMessage * getCopy()
222   {
223     if (ckBuffer == NULL) {
224       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
225       CmiAbort("Abort!");
226     }
227     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
228   }     
229   inline void updateBuddy(int b1, int b2) {
230      CmiAssert(ckBuffer);
231      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
232      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
233      CmiAssert(pNo != CkMyPe());
234   }
235   inline int getSize() { 
236      CmiAssert(ckBuffer);
237      return ckBuffer->len; 
238   }
239 };
240
241 // checkpoint holder class - for in-disk checkpointing
242 class CkDiskCheckPTInfo: public CkCheckPTInfo 
243 {
244   char *fname;
245   int bud1, bud2;
246   int len;                      // checkpoint size
247 public:
248   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
249   {
250 #if CMK_USE_MKSTEMP
251     fname = new char[64];
252     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
253     mkstemp(fname);
254 #else
255     fname=tmpnam(NULL);
256 #endif
257     bud1 = bud2 = -1;
258     len = 0;
259   }
260   ~CkDiskCheckPTInfo() 
261   {
262     remove(fname);
263   }
264   inline void updateBuffer(CkArrayCheckPTMessage *data) 
265   {
266     double t = CmiWallTimer();
267     // unpack it
268     envelope *env = UsrToEnv(data);
269     CkUnpackMessage(&env);
270     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
271     FILE *f = fopen(fname,"wb");
272     PUP::toDisk p(f);
273     CkPupMessage(p, (void **)&data);
274     // delay sync to the end because otherwise the messages are blocked
275 //    fsync(fileno(f));
276     fclose(f);
277     bud1 = data->bud1;
278     bud2 = data->bud2;
279     len = data->len;
280     delete data;
281     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
282   }
283   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
284   {
285     CkArrayCheckPTMessage *data;
286     FILE *f = fopen(fname,"rb");
287     PUP::fromDisk p(f);
288     CkPupMessage(p, (void **)&data);
289     fclose(f);
290     data->bud1 = bud1;                          // update the buddies
291     data->bud2 = bud2;
292     return data;
293   }
294   inline void updateBuddy(int b1, int b2) {
295      bud1 = b1; bud2 = b2;
296      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
297      CmiAssert(pNo != CkMyPe());
298   }
299   inline int getSize() { 
300      return len; 
301   }
302 };
303
304 CkMemCheckPT::CkMemCheckPT(int w)
305 {
306   int numnodes = 0;
307 #if NODE_CHECKPOINT
308   numnodes = CmiNumPhysicalNodes();
309 #else
310   numnodes = CkNumPes();
311 #endif
312 #if CK_NO_PROC_POOL
313   if (numnodes <= 2)
314 #else
315   if (numnodes  == 1)
316 #endif
317   {
318     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
319     _memChkptOn = 0;
320   }
321   inRestarting = 0;
322   recvCount = peCount = 0;
323   ackCount = 0;
324   expectCount = -1;
325   where = w;
326
327 #if CMK_CONVERSE_MPI
328   void pingBuddy();
329   void pingCheckHandler();
330   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
331   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
332 #endif
333 }
334
335 CkMemCheckPT::~CkMemCheckPT()
336 {
337   int len = ckTable.length();
338   for (int i=0; i<len; i++) {
339     delete ckTable[i];
340   }
341 }
342
343 void CkMemCheckPT::pup(PUP::er& p) 
344
345   CBase_CkMemCheckPT::pup(p); 
346   p|cpStarter;
347   p|thisFailedPe;
348   p|failedPes;
349   p|ckCheckPTGroupID;           // recover global variable
350   p|cpCallback;                 // store callback
351   p|where;                      // where to checkpoint
352   p|peCount;
353   if (p.isUnpacking()) {
354     recvCount = 0;
355 #if CMK_CONVERSE_MPI
356     void pingBuddy();
357     void pingCheckHandler();
358     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
359     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
360 #endif
361   }
362 }
363
364 // called by checkpoint mgr to restore an array element
365 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
366 {
367 #if CMK_MEM_CHECKPOINT
368   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
369   // m->index.print();
370   PUP::fromMem p(m->packData);
371   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
372   CmiAssert(mgr);
373 #if STREAMING_INFORMHOME
374   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
375 #else
376   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
377 #endif
378
379   // find a list of array elements bound together
380   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
381   CmiAssert(elt);
382   CkLocRec_local *rec = elt->myRec;
383   CkVec<CkMigratable *> list;
384   mgr->migratableList(rec, list);
385   CmiAssert(list.length() > 0);
386   for (int l=0; l<list.length(); l++) {
387     elt = (ArrayElement *)list[l];
388     elt->budPEs[0] = m->bud1;
389     elt->budPEs[1] = m->bud2;
390     //    reset, may not needed now
391     // for now.
392     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
393       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
394       if (c) c->redNo = 0;
395     }
396   }
397 #endif
398 }
399
400 // return 1 if pe is a crashed processor
401 int CkMemCheckPT::isFailed(int pe)
402 {
403   for (int i=0; i<failedPes.length(); i++)
404     if (failedPes[i] == pe) return 1;
405   return 0;
406 }
407
408 // add pe into history list of all failed processors
409 void CkMemCheckPT::failed(int pe)
410 {
411   if (isFailed(pe)) return;
412   failedPes.push_back(pe);
413 }
414
415 int CkMemCheckPT::totalFailed()
416 {
417   return failedPes.length();
418 }
419
420 // create an checkpoint entry for array element of aid with index.
421 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
422 {
423   // error check, no duplicate
424   int idx, len = ckTable.size();
425   for (idx=0; idx<len; idx++) {
426     CkCheckPTInfo *entry = ckTable[idx];
427     if (index == entry->index) {
428       if (loc == entry->locMgr) {
429           // bindTo array elements
430           return;
431       }
432         // for array inheritance, the following check may fail
433         // because ArrayElement constructor of all superclasses are called
434       if (aid == entry->aid) {
435         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
436         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
437       }
438     }
439   }
440   CkCheckPTInfo *newEntry;
441   if (where == CkCheckPoint_inMEM)
442     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
443   else
444     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
445   ckTable.push_back(newEntry);
446   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
447 }
448
449 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
450 {
451 #if !CMK_CHKP_ALL       
452   int buddy = msg->bud1;
453   if (buddy == CkMyPe()) buddy = msg->bud2;
454   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
455   recvData(msg);
456     // ack
457   thisProxy[buddy].gotData();
458 #else
459   recvArrayCheckpoint(msg);
460   thisProxy[msg->bud2].gotData();
461 #endif
462 }
463
464 // loop through my checkpoint table and ask checkpointed array elements
465 // to send me checkpoint data.
466 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
467 {
468   checkpointed = 1;
469   cpCallback = cb;
470   cpStarter = starter;
471   if (CkMyPe() == cpStarter) {
472     startTime = CmiWallTimer();
473     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
474   }
475 #if !CMK_CHKP_ALL
476   int len = ckTable.length();
477   for (int i=0; i<len; i++) {
478     CkCheckPTInfo *entry = ckTable[i];
479       // always let the bigger number processor send request
480     //if (CkMyPe() < entry->pNo) continue;
481       // always let the smaller number processor send request, may on same proc
482     if (!isMaster(entry->pNo)) continue;
483       // call inmem_checkpoint to the array element, ask it to send
484       // back checkpoint data via recvData().
485     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
486     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
487   }
488     // if my table is empty, then I am done
489   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
490 #else
491   startArrayCheckpoint();
492 #endif
493   // pack and send proc level data
494   sendProcData();
495 }
496
497 class MemElementPacker : public CkLocIterator{
498         private:
499                 CkLocMgr *locMgr;
500                 PUP::er &p;
501         public:
502                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
503                 void addLocation(CkLocation &loc){
504                         CkArrayIndexMax idx = loc.getIndex();
505                         CkGroupID gID = locMgr->ckGetGroupID();
506                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
507                         CmiAssert(elt);
508                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
509                         p|gID;
510                         p|idx;
511                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
512                 }
513 };
514
515 void CkMemCheckPT::pupAllElements(PUP::er &p){
516 #if CMK_CHKP_ALL
517         int numElements;
518         if(!p.isUnpacking()){
519                 numElements = CkCountArrayElements();
520         }
521         p | numElements;
522         if(!p.isUnpacking()){
523                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
524         }
525 #endif
526 }
527
528 void CkMemCheckPT::startArrayCheckpoint(){
529 #if CMK_CHKP_ALL
530         int size;
531         {
532                 PUP::sizer psizer;
533                 pupAllElements(psizer);
534                 size = psizer.size();
535         }
536         int packSize = size/sizeof(double)+1;
537         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
538         msg->len = size;
539         msg->cp_flag = 1;
540         int budPEs[2];
541         msg->bud1=CkMyPe();
542         msg->bud2=ChkptOnPe(CkMyPe());
543         {
544                 PUP::toMem p(msg->packData);
545                 pupAllElements(p);
546         }
547         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
548         chkpTable[0] = msg;
549         recvCount++;
550 #endif
551 }
552
553 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
554 {
555 #if CMK_CHKP_ALL
556         int idx = 1;
557         if(msg->bud1 == CkMyPe()){
558                 idx = 0;
559         }
560         int isChkpting = msg->cp_flag;
561         chkpTable[idx] = msg;
562         if(isChkpting){
563                 recvCount++;
564                 if(recvCount == 2){
565                   if (where == CkCheckPoint_inMEM) {
566                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
567                   }
568                   else if (where == CkCheckPoint_inDISK) {
569                         // another barrier for finalize the writing using fsync
570                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
571                         contribute(0,NULL,CkReduction::sum_int,localcb);
572                   }
573                   else
574                         CmiAbort("Unknown checkpoint scheme");
575                   recvCount = 0;
576                 }
577         }
578 #endif
579 }
580
581 // don't handle array elements
582 static inline void _handleProcData(PUP::er &p)
583 {
584     // save readonlys, and callback BTW
585     CkPupROData(p);
586
587     // save mainchares 
588     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
589         
590 #ifndef CMK_CHARE_USE_PTR
591     // save non-migratable chare
592     CkPupChareData(p);
593 #endif
594
595     // save groups into Groups.dat
596     CkPupGroupData(p);
597
598     // save nodegroups into NodeGroups.dat
599     if(CkMyRank()==0) CkPupNodeGroupData(p);
600 }
601
602 void CkMemCheckPT::sendProcData()
603 {
604   // find out size of buffer
605   int size;
606   {
607     PUP::sizer p;
608     _handleProcData(p);
609     size = p.size();
610   }
611   int packSize = size;
612   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
613   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
614   {
615     PUP::toMem p(msg->packData);
616     _handleProcData(p);
617   }
618   msg->pe = CkMyPe();
619   msg->len = size;
620   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
621   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
622 }
623
624 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
625 {
626   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
627   CpvAccess(procChkptBuf) = msg;
628   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
629   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
630 }
631
632 // ArrayElement call this function to give us the checkpointed data
633 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
634 {
635   int len = ckTable.length();
636   int idx;
637   for (idx=0; idx<len; idx++) {
638     CkCheckPTInfo *entry = ckTable[idx];
639     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
640   }
641   CkAssert(idx < len);
642   int isChkpting = msg->cp_flag;
643   ckTable[idx]->updateBuffer(msg);
644   if (isChkpting) {
645       // all my array elements have returned their inmem data
646       // inform starter processor that I am done.
647     recvCount ++;
648     if (recvCount == ckTable.length()) {
649       if (where == CkCheckPoint_inMEM) {
650         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
651       }
652       else if (where == CkCheckPoint_inDISK) {
653         // another barrier for finalize the writing using fsync
654         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
655         contribute(0,NULL,CkReduction::sum_int,localcb);
656       }
657       else
658         CmiAbort("Unknown checkpoint scheme");
659       recvCount = 0;
660     } 
661   }
662 }
663
664 // only used in disk checkpointing
665 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
666 {
667   delete m;
668 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
669   system("sync");
670 #endif
671   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
672 }
673
674 // only is called on cpStarter when checkpoint is done
675 void CkMemCheckPT::cpFinish()
676 {
677   CmiAssert(CkMyPe() == cpStarter);
678   peCount++;
679     // now that all processors have finished, activate callback
680   if (peCount == 2) {
681     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
682     cpCallback.send();
683     peCount = 0;
684 #if !CMK_CHKP_ALL
685     thisProxy.report();
686 #endif
687   }
688 }
689
690 // for debugging, report checkpoint info
691 void CkMemCheckPT::report()
692 {
693   int objsize = 0;
694   int len = ckTable.length();
695   for (int i=0; i<len; i++) {
696     CkCheckPTInfo *entry = ckTable[i];
697     CmiAssert(entry);
698     objsize += entry->getSize();
699   }
700   CmiAssert(CpvAccess(procChkptBuf));
701 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
702 }
703
704 /*****************************************************************************
705                         RESTART Procedure
706 *****************************************************************************/
707
708 // master processor of two buddies
709 inline int CkMemCheckPT::isMaster(int buddype)
710 {
711 #if 0
712   int mype = CkMyPe();
713 //CkPrintf("ismaster: %d %d\n", pe, mype);
714   if (CkNumPes() - totalFailed() == 2) {
715     return mype > buddype;
716   }
717   for (int i=1; i<CkNumPes(); i++) {
718     int me = (buddype+i)%CkNumPes();
719     if (isFailed(me)) continue;
720     if (me == mype) return 1;
721     else return 0;
722   }
723   return 0;
724 #else
725     // smaller one
726   int mype = CkMyPe();
727 //CkPrintf("ismaster: %d %d\n", pe, mype);
728   if (CkNumPes() - totalFailed() == 2) {
729     return mype < buddype;
730   }
731 #if NODE_CHECKPOINT
732   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
733   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
734 #else
735   for (int i=1; i<CkNumPes(); i++) {
736 #endif
737     int me = (mype+i)%CkNumPes();
738     if (isFailed(me)) continue;
739     if (me == buddype) return 1;
740     else return 0;
741   }
742   return 0;
743 #endif
744 }
745
746
747
748 #if 0
749 // helper class to pup all elements that belong to same ckLocMgr
750 class ElementDestoryer : public CkLocIterator {
751 private:
752         CkLocMgr *locMgr;
753 public:
754         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
755         void addLocation(CkLocation &loc) {
756                 CkArrayIndex idx=loc.getIndex();
757                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
758                 loc.destroy();
759         }
760 };
761 #endif
762
763 // restore the bitmap vector for LB
764 void CkMemCheckPT::resetLB(int diepe)
765 {
766 #if CMK_LBDB_ON
767   int i;
768   char *bitmap = new char[CkNumPes()];
769   // set processor available bitmap
770   get_avail_vector(bitmap);
771
772   for (i=0; i<failedPes.length(); i++)
773     bitmap[failedPes[i]] = 0; 
774   bitmap[diepe] = 0;
775
776 #if CK_NO_PROC_POOL
777   set_avail_vector(bitmap);
778 #endif
779
780   // if I am the crashed pe, rebuild my failedPEs array
781   if (CkMyNode() == diepe)
782     for (i=0; i<CkNumPes(); i++) 
783       if (bitmap[i]==0) failed(i);
784
785   delete [] bitmap;
786 #endif
787 }
788
789 // in case when failedPe dies, everybody go through its checkpoint table:
790 // destory all array elements
791 // recover lost buddies
792 // reconstruct all array elements from check point data
793 // called on all processors
794 void CkMemCheckPT::restart(int diePe)
795 {
796 #if CMK_MEM_CHECKPOINT
797   double curTime = CmiWallTimer();
798   if (CkMyPe() == diePe)
799     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
800   stage = (char*)"resetLB";
801   startTime = curTime;
802   if (CkMyPe() == diePe)
803     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
804
805 #if CK_NO_PROC_POOL
806   failed(diePe);        // add into the list of failed pes
807 #endif
808   thisFailedPe = diePe;
809
810   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
811
812   inRestarting = 1;
813                                                                                 
814   // disable load balancer's barrier
815   if (CkMyPe() != diePe) resetLB(diePe);
816
817   CKLOCMGR_LOOP(mgr->startInserting(););
818
819   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
820   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
821 /*
822   if (CkMyPe() == 0)
823     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
824 */
825 #endif
826 }
827
828 // loally remove all array elements
829 void CkMemCheckPT::removeArrayElements()
830 {
831 #if CMK_MEM_CHECKPOINT
832   int len = ckTable.length();
833   double curTime = CmiWallTimer();
834   if (CkMyPe() == thisFailedPe) 
835     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
836   stage = (char*)"removeArrayElements";
837   startTime = curTime;
838
839   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
840   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
841
842   // get rid of all buffering and remote recs
843   // including destorying all array elements
844   CKLOCMGR_LOOP(mgr->flushAllRecs(););
845
846 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
847
848   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
849   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
850 #endif
851 }
852
853 // flush state in reduction manager
854 void CkMemCheckPT::resetReductionMgr()
855 {
856   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
857   int numGroups = CkpvAccess(_groupIDTable)->size();
858   for(int i=0;i<numGroups;i++) {
859     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
860     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
861     obj->flushStates();
862     obj->ckJustMigrated();
863   }
864   // reset again
865   //CpvAccess(_qd)->flushStates();
866
867 #if 1
868   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
869   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
870 #else
871   if (CkMyPe() == 0)
872     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
873 #endif
874 }
875
876 // recover the lost buddies
877 void CkMemCheckPT::recoverBuddies()
878 {
879   int idx;
880   int len = ckTable.length();
881   // ready to flush reduction manager
882   // cannot be CkMemCheckPT::restart because destory will modify states
883   double curTime = CmiWallTimer();
884   if (CkMyPe() == thisFailedPe)
885   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
886   stage = (char *)"recoverBuddies";
887   if (CkMyPe() == thisFailedPe)
888   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
889   startTime = curTime;
890
891   // recover buddies
892   expectCount = 0;
893 #if !CMK_CHKP_ALL
894   for (idx=0; idx<len; idx++) {
895     CkCheckPTInfo *entry = ckTable[idx];
896     if (entry->pNo == thisFailedPe) {
897 #if CK_NO_PROC_POOL
898       // find a new buddy
899       int budPe = BuddyPE(CkMyPe());
900 #else
901       int budPe = thisFailedPe;
902 #endif
903       CkArrayCheckPTMessage *msg = entry->getCopy();
904       msg->bud1 = budPe;
905       msg->bud2 = CkMyPe();
906       msg->cp_flag = 0;            // not checkpointing
907       thisProxy[budPe].recoverEntry(msg);
908       expectCount ++;
909     }
910   }
911 #else
912   //send to failed pe
913   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
914 #if CK_NO_PROC_POOL
915       // find a new buddy
916       int budPe = BuddyPE(CkMyPe());
917 #else
918       int budPe = thisFailedPe;
919 #endif
920       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
921       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
922           msg->cp_flag = 0;            // not checkpointing
923       msg->bud1 = budPe;
924       msg->bud2 = CkMyPe();
925       thisProxy[budPe].recoverEntry(msg);
926       expectCount ++;
927   }
928 #endif
929
930 #if 1
931   if (expectCount == 0) {
932     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
933     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
934   }
935 #else
936   if (CkMyPe() == 0) {
937     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
938   }
939 #endif
940
941   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
942 }
943
944 void CkMemCheckPT::gotData()
945 {
946   ackCount ++;
947   if (ackCount == expectCount) {
948     ackCount = 0;
949     expectCount = -1;
950     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
951     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
952   }
953 }
954
955 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
956 {
957   for (int i=0; i<n; i++) {
958     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
959     mgr->updateLocation(idx[i], nowOnPe);
960   }
961 }
962
963 // restore array elements
964 void CkMemCheckPT::recoverArrayElements()
965 {
966   double curTime = CmiWallTimer();
967   int len = ckTable.length();
968   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
969   stage = (char *)"recoverArrayElements";
970   if (CkMyPe() == thisFailedPe)
971   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
972   startTime = curTime;
973  int flag = 0;
974   // recover all array elements
975   int count = 0;
976 #if STREAMING_INFORMHOME
977   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
978   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
979 #endif
980 #if !CMK_CHKP_ALL
981   for (int idx=0; idx<len; idx++)
982   {
983     CkCheckPTInfo *entry = ckTable[idx];
984 #if CK_NO_PROC_POOL
985     // the bigger one will do 
986 //    if (CkMyPe() < entry->pNo) continue;
987     if (!isMaster(entry->pNo)) continue;
988 #else
989     // smaller one do it, which has the original object
990     if (CkMyPe() == entry->pNo+1 || 
991         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
992 #endif
993 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
994
995     entry->updateBuddy(CkMyPe(), entry->pNo);
996     CkArrayCheckPTMessage *msg = entry->getCopy();
997     // gzheng
998     //thisProxy[CkMyPe()].inmem_restore(msg);
999     inmem_restore(msg);
1000 #if STREAMING_INFORMHOME
1001     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1002     int homePe = mgr->homePe(msg->index);
1003     if (homePe != CkMyPe()) {
1004       gmap[homePe].push_back(msg->locMgr);
1005       imap[homePe].push_back(msg->index);
1006     }
1007 #endif
1008     CkFreeMsg(msg);
1009     count ++;
1010   }
1011 #else
1012         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1013         recoverAll(msg,gmap,imap);
1014     CkFreeMsg(msg);
1015 #endif
1016   curTime = CmiWallTimer();
1017   if (CkMyPe() == thisFailedPe)
1018         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1019 #if STREAMING_INFORMHOME
1020   for (int i=0; i<CkNumPes(); i++) {
1021     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1022       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1023         CkPrintf("[%d] send to %d\n",CkMyPe(),i);
1024         flag++; 
1025         }
1026   }
1027   delete [] imap;
1028   delete [] gmap;
1029 #endif
1030   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1031
1032   CKLOCMGR_LOOP(mgr->doneInserting(););
1033
1034   // _crashedNode = -1;
1035   CpvAccess(_crashedNode) = -1;
1036 if(CkMyPe()!=thisFailedPe)
1037   inRestarting = 0;
1038  // if (CkMyPe() == 0)
1039    // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1040 if(flag == 0)
1041 {
1042     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1043 }
1044 }
1045
1046 void CkMemCheckPT::gotReply(){
1047     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1048 }
1049
1050 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1051 #if CMK_CHKP_ALL
1052         PUP::fromMem p(msg->packData);
1053         int numElements;
1054         p|numElements;
1055         if(p.isUnpacking()){
1056                 for(int i=0;i<numElements;i++){
1057                         CkGroupID gID;
1058                         CkArrayIndex idx;
1059                         p|gID;
1060                         p|idx;
1061                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1062 #if STREAMING_INFORMHOME
1063                         mgr->resume(idx,p,CmiFalse);
1064 #else
1065                         mgr->resume(idx,p,CmiTrue);
1066 #endif
1067                           /*CkLocRec_local *rec = loc.getLocalRecord();
1068                           CmiAssert(rec);
1069                           CkVec<CkMigratable *> list;
1070                           mgr->migratableList(rec, list);
1071                           CmiAssert(list.length() > 0);
1072                           for (int l=0; l<list.length(); l++) {
1073                                 ArrayElement * elt = (ArrayElement *)list[l];
1074                                 //    reset, may not needed now
1075                                 // for now.
1076                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1077                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1078                                   if (c) c->redNo = 0;
1079                                 }
1080                           }*/
1081 #if STREAMING_INFORMHOME
1082                         int homePe = mgr->homePe(idx);
1083                         if (homePe != CkMyPe()) {
1084                           gmap[homePe].push_back(gID);
1085                           imap[homePe].push_back(idx);
1086                         }
1087 #endif
1088                 }
1089         }
1090 #endif
1091 }
1092
1093 static double restartT;
1094
1095 // on every processor
1096 // turn load balancer back on
1097 void CkMemCheckPT::finishUp()
1098 {
1099   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1100   //CKLOCMGR_LOOP(mgr->doneInserting(););
1101   
1102   if (CkMyPe() == thisFailedPe)
1103   {
1104         inRestarting=0;
1105        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1106        //CkStartQD(cpCallback);
1107        cpCallback.send();
1108        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1109   }
1110
1111 #if CK_NO_PROC_POOL
1112 #if NODE_CHECKPOINT
1113   int numnodes = CmiNumPhysicalNodes();
1114 #else
1115   int numnodes = CkNumPes();
1116 #endif
1117   if (numnodes-totalFailed() <=2) {
1118     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1119     _memChkptOn = 0;
1120   }
1121 #endif
1122 }
1123
1124 // called only on 0
1125 void CkMemCheckPT::quiescence(CkCallback &cb)
1126 {
1127   static int pe_count = 0;
1128   pe_count ++;
1129   CmiAssert(CkMyPe() == 0);
1130   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1131   if (pe_count == CkNumPes()) {
1132     pe_count = 0;
1133     cb.send();
1134   }
1135 }
1136
1137 // User callable function - to start a checkpoint
1138 // callback cb is used to pass control back
1139 void CkStartMemCheckpoint(CkCallback &cb)
1140 {
1141 #if CMK_MEM_CHECKPOINT
1142   if (_memChkptOn == 0) {
1143     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1144     cb.send();
1145     return;
1146   }
1147   if (CkInRestarting()) {
1148       // trying to checkpointing during restart
1149     cb.send();
1150     return;
1151   }
1152     // store user callback and user data
1153   CkMemCheckPT::cpCallback = cb;
1154
1155     // broadcast to start check pointing
1156   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1157   checkptMgr.doItNow(CkMyPe(), cb);
1158 #else
1159   // when mem checkpoint is disabled, invike cb immediately
1160   cb.send();
1161 #endif
1162 }
1163
1164 void CkRestartCheckPoint(int diePe)
1165 {
1166 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1167   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1168   // broadcast
1169   checkptMgr.restart(diePe);
1170 }
1171
1172 static int _diePE = -1;
1173
1174 // callback function used locally by ccs handler
1175 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1176 {
1177 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1178   CkRestartCheckPoint(_diePE);
1179 }
1180
1181 // Converse function handles
1182 static int askPhaseHandlerIdx;
1183 static int recvPhaseHandlerIdx;
1184 static int askProcDataHandlerIdx;
1185 static int restartBcastHandlerIdx;
1186 static int recoverProcDataHandlerIdx;
1187 static int restartBeginHandlerIdx;
1188 static int notifyHandlerIdx;
1189
1190 // called on crashed PE
1191 static void restartBeginHandler(char *msg)
1192 {
1193   CmiFree(msg);
1194 #if CMK_MEM_CHECKPOINT
1195 #if CMK_USE_BARRIER
1196         if(CkMyPe()!=_diePE){
1197                 printf("restar begin on %d\n",CkMyPe());
1198                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1199                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1200                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1201         }else{
1202         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1203         CkRestartCheckPointCallback(NULL, NULL);
1204         }
1205 #else
1206   static int count = 0;
1207   CmiAssert(CkMyPe() == _diePE);
1208   count ++;
1209   if (count == CkNumPes()) {
1210     CkRestartCheckPointCallback(NULL, NULL);
1211     count = 0;
1212   }
1213 #endif
1214 #endif
1215 }
1216
1217 extern void _discard_charm_message();
1218 extern void _resume_charm_message();
1219
1220 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1221         return data;
1222 }
1223
1224 static void restartBcastHandler(char *msg)
1225 {
1226 #if CMK_MEM_CHECKPOINT
1227   // advance phase counter
1228   CkMemCheckPT::inRestarting = 1;
1229   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1230   // gzheng
1231   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1232
1233   if (CkMyPe()==_diePE)
1234     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1235
1236   // reset QD counters
1237 /*  gzheng
1238   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1239 */
1240
1241 /*  gzheng
1242   if (CkMyPe()==_diePE)
1243       CkRestartCheckPointCallback(NULL, NULL);
1244 */
1245   CmiFree(msg);
1246
1247   _resume_charm_message();
1248
1249     // reduction
1250   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1251   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1252 #if CMK_USE_BARRIER
1253         //CmiPrintf("before reduce\n"); 
1254         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1255         //CmiPrintf("after reduce\n");  
1256 #else
1257   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1258 #endif 
1259  checkpointed = 0;
1260 #endif
1261 }
1262
1263 extern void _initDone();
1264
1265 // called on crashed processor
1266 static void recoverProcDataHandler(char *msg)
1267 {
1268 #if CMK_MEM_CHECKPOINT
1269    int i;
1270    envelope *env = (envelope *)msg;
1271    CkUnpackMessage(&env);
1272    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1273    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1274    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1275    //cur_restart_phase ++;
1276      // gzheng ?
1277    //CpvAccess(_qd)->flushStates();
1278
1279    // restore readonly, mainchare, group, nodegroup
1280 //   int temp = cur_restart_phase;
1281 //   cur_restart_phase = -1;
1282    PUP::fromMem p(procMsg->packData);
1283    _handleProcData(p);
1284
1285    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1286    // gzheng
1287    CKLOCMGR_LOOP(mgr->startInserting(););
1288
1289    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1290    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1291    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1292    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1293
1294    _initDone();
1295 //   CpvAccess(_qd)->flushStates();
1296    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1297 #endif
1298 }
1299
1300 // called on its backup processor
1301 // get backup message buffer and sent to crashed processor
1302 static void askProcDataHandler(char *msg)
1303 {
1304 #if CMK_MEM_CHECKPOINT
1305     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1306     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1307     if (CpvAccess(procChkptBuf) == NULL)  {
1308       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1309       CkAbort("no checkpoint found");
1310     }
1311     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1312     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1313
1314     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1315
1316     CkPackMessage(&env);
1317     CmiSetHandler(env, recoverProcDataHandlerIdx);
1318     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1319     CpvAccess(procChkptBuf) = NULL;
1320 #endif
1321 }
1322
1323 // called on PE 0
1324 void qd_callback(void *m)
1325 {
1326    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1327    CkFreeMsg(m);
1328 #ifdef CMK_SMP
1329    for(int i=0;i<CmiMyNodeSize();i++){
1330    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1331    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1332         CmiSetHandler(msg, askProcDataHandlerIdx);
1333         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1334         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1335    }
1336    return;
1337 #endif
1338    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1339    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1340    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1341    CmiSetHandler(msg, askProcDataHandlerIdx);
1342    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1343    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1344
1345 }
1346
1347 // on crashed node
1348 void CkMemRestart(const char *dummy, CkArgMsg *args)
1349 {
1350 #if CMK_MEM_CHECKPOINT
1351    _diePE = CmiMyNode();
1352    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1353    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1354    CkMemCheckPT::inRestarting = 1;
1355
1356   CpvAccess( _crashedNode )= CmiMyNode();
1357         
1358   _discard_charm_message();
1359   
1360   if(CmiMyRank()==0){
1361     CkCallback cb(qd_callback);
1362     CkStartQD(cb);
1363     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1364   }
1365 #else
1366    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1367 #endif
1368 }
1369
1370 // can be called in other files
1371 // return true if it is in restarting
1372 extern "C"
1373 int CkInRestarting()
1374 {
1375 #if CMK_MEM_CHECKPOINT
1376   if (CpvAccess( _crashedNode)!=-1) return 1;
1377   // gzheng
1378   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1379   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1380   return CkMemCheckPT::inRestarting;
1381 #else
1382   return 0;
1383 #endif
1384 }
1385
1386 extern "C"
1387 void CkSetInLdb(){
1388 #if CMK_MEM_CHECKPOINT
1389         CkMemCheckPT::inLoadbalancing = 1;
1390 #endif
1391 }
1392
1393 extern "C"
1394 int CkInLdb(){
1395 #if CMK_MEM_CHECKPOINT
1396         return CkMemCheckPT::inLoadbalancing;
1397 #endif
1398 }
1399
1400 extern "C"
1401 void CkResetInLdb(){
1402 #if CMK_MEM_CHECKPOINT
1403         CkMemCheckPT::inLoadbalancing = 0;
1404 #endif
1405 }
1406
1407 /*****************************************************************************
1408                 module initialization
1409 *****************************************************************************/
1410
1411 static int arg_where = CkCheckPoint_inMEM;
1412
1413 #if CMK_MEM_CHECKPOINT
1414 void init_memcheckpt(char **argv)
1415 {
1416     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1417       arg_where = CkCheckPoint_inDISK;
1418     }
1419
1420         // initiliazing _crashedNode variable
1421         CpvInitialize(int, _crashedNode);
1422         CpvAccess(_crashedNode) = -1;
1423
1424 }
1425 #endif
1426
1427 class CkMemCheckPTInit: public Chare {
1428 public:
1429   CkMemCheckPTInit(CkArgMsg *m) {
1430 #if CMK_MEM_CHECKPOINT
1431     if (arg_where == CkCheckPoint_inDISK) {
1432       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1433     }
1434     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1435     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1436 #endif
1437   }
1438 };
1439
1440 static void notifyHandler(char *msg)
1441 {
1442 #if CMK_MEM_CHECKPOINT
1443   CmiFree(msg);
1444       /* immediately increase restart phase to filter old messages */
1445   CpvAccess(_curRestartPhase) ++;
1446   CpvAccess(_qd)->flushStates();
1447   _discard_charm_message();
1448 #endif
1449 }
1450
1451 extern "C"
1452 void notify_crash(int node)
1453 {
1454 #ifdef CMK_MEM_CHECKPOINT
1455   CpvAccess( _crashedNode) = node;
1456 #ifdef CMK_SMP
1457   for(int i=0;i<CkMyNodeSize();i++){
1458         CpvAccessOther(_crashedNode,i)=node;
1459   }
1460 #endif
1461   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1462   CkMemCheckPT::inRestarting = 1;
1463
1464     // this may be in interrupt handler, send a message to reset QD
1465   int pe = CmiNodeFirst(CkMyNode());
1466   for(int i=0;i<CkMyNodeSize();i++){
1467         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1468         CmiSetHandler(msg, notifyHandlerIdx);
1469         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1470   }
1471 #endif
1472 }
1473
1474 extern "C" void (*notify_crash_fn)(int node);
1475
1476 #if CMK_CONVERSE_MPI
1477 static int pingHandlerIdx;
1478 static int pingCheckHandlerIdx;
1479 static int buddyDieHandlerIdx;
1480 static double lastPingTime = -1;
1481
1482 extern "C" void mpi_restart_crashed(int pe, int rank);
1483 extern "C" int  find_spare_mpirank(int pe);
1484
1485 void pingBuddy();
1486 void pingCheckHandler();
1487
1488 void buddyDieHandler(char *msg)
1489 {
1490 #if CMK_MEM_CHECKPOINT
1491    // notify
1492    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1493    notify_crash(diepe);
1494    // send message to crash pe to let it restart
1495    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1496    int newrank = find_spare_mpirank(diepe);
1497    int buddy = obj->BuddyPE(CmiMyPe());
1498    if (buddy == diepe)  {
1499      mpi_restart_crashed(diepe, newrank);
1500      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1501    }
1502 #endif
1503 }
1504
1505 void pingHandler(void *msg)
1506 {
1507   lastPingTime = CmiWallTimer();
1508   CmiFree(msg);
1509 }
1510
1511 void pingCheckHandler()
1512 {
1513 #if CMK_MEM_CHECKPOINT
1514   double now = CmiWallTimer();
1515   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1516     int i, pe, buddy;
1517     // tell everyone the buddy dies
1518     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1519     for (i = 1; i < CmiNumPes(); i++) {
1520        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1521        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1522     }
1523     buddy = pe;
1524     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1525     for (int pe = 0; pe < CmiNumPes(); pe++) {
1526       if (obj->isFailed(pe) || pe == buddy) continue;
1527       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1528       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1529       CmiSetHandler(msg, buddyDieHandlerIdx);
1530       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1531     }
1532   }
1533   else 
1534     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1535 #endif
1536 }
1537
1538 void pingBuddy()
1539 {
1540 #if CMK_MEM_CHECKPOINT
1541   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1542   if (obj) {
1543     int buddy = obj->BuddyPE(CkMyPe());
1544 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1545     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1546     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1547     CmiSetHandler(msg, pingHandlerIdx);
1548     CmiGetRestartPhase(msg) = 9999;
1549     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1550   }
1551   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1552 #endif
1553 }
1554 #endif
1555
1556 // initproc
1557 void CkRegisterRestartHandler( )
1558 {
1559 #if CMK_MEM_CHECKPOINT
1560   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1561   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1562   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1563   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1564   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1565
1566 #if CMK_CONVERSE_MPI
1567   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1568   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1569   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1570 #endif
1571
1572   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1573   CpvAccess(procChkptBuf) = NULL;
1574
1575   notify_crash_fn = notify_crash;
1576
1577 #if ! CMK_CONVERSE_MPI
1578   // print pid to kill
1579 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1580 //  sleep(4);
1581 #endif
1582 #endif
1583 }
1584
1585
1586 extern "C"
1587 int CkHasCheckpoints()
1588 {
1589   return checkpointed;
1590 }
1591
1592 /// @todo: the following definitions should be moved to a separate file containing
1593 // structures and functions about fault tolerance strategies
1594
1595 /**
1596  *  * @brief: function for killing a process                                             
1597  *   */
1598 #ifdef CMK_MEM_CHECKPOINT
1599 #if CMK_HAS_GETPID
1600 void killLocal(void *_dummy,double curWallTime){
1601         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1602         if(CmiWallTimer()<killTime-1){
1603                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1604         }else{ 
1605 #if CMK_CONVERSE_MPI
1606                                 CkDieNow();
1607 #else 
1608                 kill(getpid(),SIGKILL);                                               
1609 #endif
1610         }              
1611
1612 #else
1613 void killLocal(void *_dummy,double curWallTime){
1614   CmiAbort("kill() not supported!");
1615 }
1616 #endif
1617 #endif
1618
1619 #ifdef CMK_MEM_CHECKPOINT
1620 /**
1621  * @brief: reads the file with the kill information
1622  */
1623 void readKillFile(){
1624         FILE *fp=fopen(killFile,"r");
1625         if(!fp){
1626                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1627                 return;
1628         }
1629         int proc;
1630         double sec;
1631         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1632                 if(proc == CkMyNode() && CkMyRank() == 0){
1633                         killTime = CmiWallTimer()+sec;
1634                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1635                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1636                 }
1637         }
1638         fclose(fp);
1639 }
1640
1641 #if ! CMK_CONVERSE_MPI
1642 void CkDieNow()
1643 {
1644          // ignored for non-mpi version
1645         CmiPrintf("[%d] die now.\n", CmiMyPe());
1646         killTime = CmiWallTimer()+0.001;
1647         CcdCallFnAfter(killLocal,NULL,1);
1648 }
1649 #endif
1650
1651 #endif
1652
1653 #include "CkMemCheckpoint.def.h"
1654
1655