optimize chkp after ldb
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 #ifdef CKLOCMGR_LOOP
97 #undef CKLOCMGR_LOOP
98 #endif
99 // loop over all CkLocMgr and do "code"
100 #define  CKLOCMGR_LOOP(code)    {       \
101   int numGroups = CkpvAccess(_groupIDTable)->size();    \
102   for(int i=0;i<numGroups;i++) {        \
103     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
104     if(obj->isLocMgr())  {      \
105       CkLocMgr *mgr = (CkLocMgr*)obj;   \
106       code      \
107     }   \
108   }     \
109  }
110
111 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
112 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
113
114 // compute the backup processor
115 // FIXME: avoid crashed processors
116 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
117
118 inline int CkMemCheckPT::BuddyPE(int pe)
119 {
120   int budpe;
121 #if NODE_CHECKPOINT
122     // buddy is the processor with same rank on the next physical node
123   int r1 = CmiPhysicalRank(pe);
124   int budnode = CmiPhysicalNodeID(pe);
125   do {
126     budnode = (budnode+1)%CmiNumPhysicalNodes();
127     int *pelist;
128     int num;
129     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
130     budpe = pelist[r1 % num];
131   } while (isFailed(budpe));
132   if (budpe == pe) {
133     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
134     CmiAbort("Failed to find a buddy processor");
135   }
136 #else
137   budpe = pe;
138   while (budpe == pe || isFailed(budpe)) 
139           budpe = (budpe+1)%CkNumPes();
140 #endif
141   return budpe;
142 }
143
144 // called in array element constructor
145 // choose and register with 2 buddies for checkpoiting 
146 #if CMK_MEM_CHECKPOINT
147 void ArrayElement::init_checkpt() {
148         if (_memChkptOn == 0) return;
149         if (CkInRestarting()) {
150           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
151         }
152         // only master init checkpoint
153         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
154
155         budPEs[0] = CkMyPe();
156         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
157         CmiAssert(budPEs[0] != budPEs[1]);
158         // inform checkPTMgr
159         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
160         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
161         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
162         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
163 }
164 #endif
165
166 // entry function invoked by checkpoint mgr asking for checkpoint data
167 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
168 #if CMK_MEM_CHECKPOINT
169 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
170 //char index[128];   thisIndexMax.sprint(index);
171 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
172   CkLocMgr *locMgr = thisArray->getLocMgr();
173   CmiAssert(myRec!=NULL);
174   int size;
175   {
176         PUP::sizer p;
177         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
178         size = p.size();
179   }
180   int packSize = size/sizeof(double) +1;
181   CkArrayCheckPTMessage *msg =
182                  new (packSize, 0) CkArrayCheckPTMessage;
183   msg->len = size;
184   msg->index =thisIndexMax;
185   msg->aid = thisArrayID;
186   msg->locMgr = locMgr->getGroupID();
187   msg->cp_flag = 1;
188   {
189         PUP::toMem p(msg->packData);
190         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
191   }
192
193   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
194   checkptMgr.recvData(msg, 2, budPEs);
195   delete m;
196 #endif
197 }
198
199 // checkpoint holder class - for memory checkpointing
200 class CkMemCheckPTInfo: public CkCheckPTInfo
201 {
202   CkArrayCheckPTMessage *ckBuffer;
203 public:
204   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
205                     CkCheckPTInfo(a, loc, idx, pno)
206   {
207     ckBuffer = NULL;
208   }
209   ~CkMemCheckPTInfo() 
210   {
211     if (ckBuffer) delete ckBuffer; 
212   }
213   inline void updateBuffer(CkArrayCheckPTMessage *data) 
214   {
215     CmiAssert(data!=NULL);
216     if (ckBuffer) delete ckBuffer;
217     ckBuffer = data;
218   }    
219   inline CkArrayCheckPTMessage * getCopy()
220   {
221     if (ckBuffer == NULL) {
222       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
223       CmiAbort("Abort!");
224     }
225     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
226   }     
227   inline void updateBuddy(int b1, int b2) {
228      CmiAssert(ckBuffer);
229      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
230      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
231      CmiAssert(pNo != CkMyPe());
232   }
233   inline int getSize() { 
234      CmiAssert(ckBuffer);
235      return ckBuffer->len; 
236   }
237 };
238
239 // checkpoint holder class - for in-disk checkpointing
240 class CkDiskCheckPTInfo: public CkCheckPTInfo 
241 {
242   char *fname;
243   int bud1, bud2;
244   int len;                      // checkpoint size
245 public:
246   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
247   {
248 #if CMK_USE_MKSTEMP
249     fname = new char[64];
250     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
251     mkstemp(fname);
252 #else
253     fname=tmpnam(NULL);
254 #endif
255     bud1 = bud2 = -1;
256     len = 0;
257   }
258   ~CkDiskCheckPTInfo() 
259   {
260     remove(fname);
261   }
262   inline void updateBuffer(CkArrayCheckPTMessage *data) 
263   {
264     double t = CmiWallTimer();
265     // unpack it
266     envelope *env = UsrToEnv(data);
267     CkUnpackMessage(&env);
268     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
269     FILE *f = fopen(fname,"wb");
270     PUP::toDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     // delay sync to the end because otherwise the messages are blocked
273 //    fsync(fileno(f));
274     fclose(f);
275     bud1 = data->bud1;
276     bud2 = data->bud2;
277     len = data->len;
278     delete data;
279     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
280   }
281   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
282   {
283     CkArrayCheckPTMessage *data;
284     FILE *f = fopen(fname,"rb");
285     PUP::fromDisk p(f);
286     CkPupMessage(p, (void **)&data);
287     fclose(f);
288     data->bud1 = bud1;                          // update the buddies
289     data->bud2 = bud2;
290     return data;
291   }
292   inline void updateBuddy(int b1, int b2) {
293      bud1 = b1; bud2 = b2;
294      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295      CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298      return len; 
299   }
300 };
301
302 CkMemCheckPT::CkMemCheckPT(int w)
303 {
304   int numnodes = 0;
305 #if NODE_CHECKPOINT
306   numnodes = CmiNumPhysicalNodes();
307 #else
308   numnodes = CkNumPes();
309 #endif
310 #if CK_NO_PROC_POOL
311   if (numnodes <= 2)
312 #else
313   if (numnodes  == 1)
314 #endif
315   {
316     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
317     _memChkptOn = 0;
318   }
319   inRestarting = 0;
320   recvCount = peCount = 0;
321   ackCount = 0;
322   expectCount = -1;
323   where = w;
324
325 #if CMK_CONVERSE_MPI
326   void pingBuddy();
327   void pingCheckHandler();
328   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
329   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
330 #endif
331 }
332
333 CkMemCheckPT::~CkMemCheckPT()
334 {
335   int len = ckTable.length();
336   for (int i=0; i<len; i++) {
337     delete ckTable[i];
338   }
339 }
340
341 void CkMemCheckPT::pup(PUP::er& p) 
342
343   CBase_CkMemCheckPT::pup(p); 
344   p|cpStarter;
345   p|thisFailedPe;
346   p|failedPes;
347   p|ckCheckPTGroupID;           // recover global variable
348   p|cpCallback;                 // store callback
349   p|where;                      // where to checkpoint
350   p|peCount;
351   if (p.isUnpacking()) {
352     recvCount = 0;
353 #if CMK_CONVERSE_MPI
354     void pingBuddy();
355     void pingCheckHandler();
356     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
357     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
358 #endif
359   }
360 }
361
362 // called by checkpoint mgr to restore an array element
363 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
364 {
365 #if CMK_MEM_CHECKPOINT
366   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
367   // m->index.print();
368   PUP::fromMem p(m->packData);
369   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
370   CmiAssert(mgr);
371 #if STREAMING_INFORMHOME
372   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
373 #else
374   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
375 #endif
376
377   // find a list of array elements bound together
378   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
379   CmiAssert(elt);
380   CkLocRec_local *rec = elt->myRec;
381   CkVec<CkMigratable *> list;
382   mgr->migratableList(rec, list);
383   CmiAssert(list.length() > 0);
384   for (int l=0; l<list.length(); l++) {
385     elt = (ArrayElement *)list[l];
386     elt->budPEs[0] = m->bud1;
387     elt->budPEs[1] = m->bud2;
388     //    reset, may not needed now
389     // for now.
390     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
391       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
392       if (c) c->redNo = 0;
393     }
394   }
395 #endif
396 }
397
398 // return 1 if pe is a crashed processor
399 int CkMemCheckPT::isFailed(int pe)
400 {
401   for (int i=0; i<failedPes.length(); i++)
402     if (failedPes[i] == pe) return 1;
403   return 0;
404 }
405
406 // add pe into history list of all failed processors
407 void CkMemCheckPT::failed(int pe)
408 {
409   if (isFailed(pe)) return;
410   failedPes.push_back(pe);
411 }
412
413 int CkMemCheckPT::totalFailed()
414 {
415   return failedPes.length();
416 }
417
418 // create an checkpoint entry for array element of aid with index.
419 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
420 {
421   // error check, no duplicate
422   int idx, len = ckTable.size();
423   for (idx=0; idx<len; idx++) {
424     CkCheckPTInfo *entry = ckTable[idx];
425     if (index == entry->index) {
426       if (loc == entry->locMgr) {
427           // bindTo array elements
428           return;
429       }
430         // for array inheritance, the following check may fail
431         // because ArrayElement constructor of all superclasses are called
432       if (aid == entry->aid) {
433         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
434         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
435       }
436     }
437   }
438   CkCheckPTInfo *newEntry;
439   if (where == CkCheckPoint_inMEM)
440     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
441   else
442     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
443   ckTable.push_back(newEntry);
444   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
445 }
446
447 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
448 {
449 #if !CMK_CHKP_ALL       
450   int buddy = msg->bud1;
451   if (buddy == CkMyPe()) buddy = msg->bud2;
452   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
453   recvData(msg);
454     // ack
455   thisProxy[buddy].gotData();
456 #else
457   recvArrayCheckpoint(msg);
458   thisProxy[msg->bud2].gotData();
459 #endif
460 }
461
462 // loop through my checkpoint table and ask checkpointed array elements
463 // to send me checkpoint data.
464 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
465 {
466   checkpointed = 1;
467   cpCallback = cb;
468   cpStarter = starter;
469   if (CkMyPe() == cpStarter) {
470     startTime = CmiWallTimer();
471     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
472   }
473 #if !CMK_CHKP_ALL
474   int len = ckTable.length();
475   for (int i=0; i<len; i++) {
476     CkCheckPTInfo *entry = ckTable[i];
477       // always let the bigger number processor send request
478     //if (CkMyPe() < entry->pNo) continue;
479       // always let the smaller number processor send request, may on same proc
480     if (!isMaster(entry->pNo)) continue;
481       // call inmem_checkpoint to the array element, ask it to send
482       // back checkpoint data via recvData().
483     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
484     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
485   }
486     // if my table is empty, then I am done
487   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
488 #else
489   startArrayCheckpoint();
490 #endif
491   // pack and send proc level data
492   sendProcData();
493 }
494
495 class MemElementPacker : public CkLocIterator{
496         private:
497                 CkLocMgr *locMgr;
498                 PUP::er &p;
499         public:
500                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
501                 void addLocation(CkLocation &loc){
502                         CkArrayIndexMax idx = loc.getIndex();
503                         CkGroupID gID = locMgr->ckGetGroupID();
504                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
505                         CmiAssert(elt);
506                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
507                         p|gID;
508                         p|idx;
509                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
510                 }
511 };
512
513 void CkMemCheckPT::pupAllElements(PUP::er &p){
514 #if CMK_CHKP_ALL
515         int numElements;
516         if(!p.isUnpacking()){
517                 numElements = CkCountArrayElements();
518         }
519         p | numElements;
520         if(!p.isUnpacking()){
521                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
522         }
523 #endif
524 }
525
526 void CkMemCheckPT::startArrayCheckpoint(){
527 #if CMK_CHKP_ALL
528         int size;
529         {
530                 PUP::sizer psizer;
531                 pupAllElements(psizer);
532                 size = psizer.size();
533         }
534         int packSize = size/sizeof(double)+1;
535         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
536         msg->len = size;
537         msg->cp_flag = 1;
538         int budPEs[2];
539         msg->bud1=CkMyPe();
540         msg->bud2=ChkptOnPe(CkMyPe());
541         {
542                 PUP::toMem p(msg->packData);
543                 pupAllElements(p);
544         }
545         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
546         chkpTable[0] = msg;
547         recvCount++;
548 #endif
549 }
550
551 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
552 {
553 #if CMK_CHKP_ALL
554         int idx = 1;
555         if(msg->bud1 == CkMyPe()){
556                 idx = 0;
557         }
558         int isChkpting = msg->cp_flag;
559         chkpTable[idx] = msg;
560         if(isChkpting){
561                 recvCount++;
562                 if(recvCount == 2){
563                   if (where == CkCheckPoint_inMEM) {
564                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
565                   }
566                   else if (where == CkCheckPoint_inDISK) {
567                         // another barrier for finalize the writing using fsync
568                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
569                         contribute(0,NULL,CkReduction::sum_int,localcb);
570                   }
571                   else
572                         CmiAbort("Unknown checkpoint scheme");
573                   recvCount = 0;
574                 }
575         }
576 #endif
577 }
578
579 // don't handle array elements
580 static inline void _handleProcData(PUP::er &p)
581 {
582     // save readonlys, and callback BTW
583     CkPupROData(p);
584
585     // save mainchares 
586     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
587         
588 #ifndef CMK_CHARE_USE_PTR
589     // save non-migratable chare
590     CkPupChareData(p);
591 #endif
592
593     // save groups into Groups.dat
594     CkPupGroupData(p);
595
596     // save nodegroups into NodeGroups.dat
597     if(CkMyRank()==0) CkPupNodeGroupData(p);
598 }
599
600 void CkMemCheckPT::sendProcData()
601 {
602   // find out size of buffer
603   int size;
604   {
605     PUP::sizer p;
606     _handleProcData(p);
607     size = p.size();
608   }
609   int packSize = size;
610   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
611   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
612   {
613     PUP::toMem p(msg->packData);
614     _handleProcData(p);
615   }
616   msg->pe = CkMyPe();
617   msg->len = size;
618   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
619   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
620 }
621
622 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
623 {
624   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
625   CpvAccess(procChkptBuf) = msg;
626   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
627   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
628 }
629
630 // ArrayElement call this function to give us the checkpointed data
631 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
632 {
633   int len = ckTable.length();
634   int idx;
635   for (idx=0; idx<len; idx++) {
636     CkCheckPTInfo *entry = ckTable[idx];
637     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
638   }
639   CkAssert(idx < len);
640   int isChkpting = msg->cp_flag;
641   ckTable[idx]->updateBuffer(msg);
642   if (isChkpting) {
643       // all my array elements have returned their inmem data
644       // inform starter processor that I am done.
645     recvCount ++;
646     if (recvCount == ckTable.length()) {
647       if (where == CkCheckPoint_inMEM) {
648         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
649       }
650       else if (where == CkCheckPoint_inDISK) {
651         // another barrier for finalize the writing using fsync
652         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
653         contribute(0,NULL,CkReduction::sum_int,localcb);
654       }
655       else
656         CmiAbort("Unknown checkpoint scheme");
657       recvCount = 0;
658     } 
659   }
660 }
661
662 // only used in disk checkpointing
663 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
664 {
665   delete m;
666 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
667   system("sync");
668 #endif
669   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
670 }
671
672 // only is called on cpStarter when checkpoint is done
673 void CkMemCheckPT::cpFinish()
674 {
675   CmiAssert(CkMyPe() == cpStarter);
676   peCount++;
677     // now that all processors have finished, activate callback
678   if (peCount == 2) {
679     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
680     cpCallback.send();
681     peCount = 0;
682 #if !CMK_CHKP_ALL
683     thisProxy.report();
684 #endif
685   }
686 }
687
688 // for debugging, report checkpoint info
689 void CkMemCheckPT::report()
690 {
691   int objsize = 0;
692   int len = ckTable.length();
693   for (int i=0; i<len; i++) {
694     CkCheckPTInfo *entry = ckTable[i];
695     CmiAssert(entry);
696     objsize += entry->getSize();
697   }
698   CmiAssert(CpvAccess(procChkptBuf));
699 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
700 }
701
702 /*****************************************************************************
703                         RESTART Procedure
704 *****************************************************************************/
705
706 // master processor of two buddies
707 inline int CkMemCheckPT::isMaster(int buddype)
708 {
709 #if 0
710   int mype = CkMyPe();
711 //CkPrintf("ismaster: %d %d\n", pe, mype);
712   if (CkNumPes() - totalFailed() == 2) {
713     return mype > buddype;
714   }
715   for (int i=1; i<CkNumPes(); i++) {
716     int me = (buddype+i)%CkNumPes();
717     if (isFailed(me)) continue;
718     if (me == mype) return 1;
719     else return 0;
720   }
721   return 0;
722 #else
723     // smaller one
724   int mype = CkMyPe();
725 //CkPrintf("ismaster: %d %d\n", pe, mype);
726   if (CkNumPes() - totalFailed() == 2) {
727     return mype < buddype;
728   }
729 #if NODE_CHECKPOINT
730   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
731   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
732 #else
733   for (int i=1; i<CkNumPes(); i++) {
734 #endif
735     int me = (mype+i)%CkNumPes();
736     if (isFailed(me)) continue;
737     if (me == buddype) return 1;
738     else return 0;
739   }
740   return 0;
741 #endif
742 }
743
744
745
746 #if 0
747 // helper class to pup all elements that belong to same ckLocMgr
748 class ElementDestoryer : public CkLocIterator {
749 private:
750         CkLocMgr *locMgr;
751 public:
752         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
753         void addLocation(CkLocation &loc) {
754                 CkArrayIndex idx=loc.getIndex();
755                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
756                 loc.destroy();
757         }
758 };
759 #endif
760
761 // restore the bitmap vector for LB
762 void CkMemCheckPT::resetLB(int diepe)
763 {
764 #if CMK_LBDB_ON
765   int i;
766   char *bitmap = new char[CkNumPes()];
767   // set processor available bitmap
768   get_avail_vector(bitmap);
769
770   for (i=0; i<failedPes.length(); i++)
771     bitmap[failedPes[i]] = 0; 
772   bitmap[diepe] = 0;
773
774 #if CK_NO_PROC_POOL
775   set_avail_vector(bitmap);
776 #endif
777
778   // if I am the crashed pe, rebuild my failedPEs array
779   if (CkMyNode() == diepe)
780     for (i=0; i<CkNumPes(); i++) 
781       if (bitmap[i]==0) failed(i);
782
783   delete [] bitmap;
784 #endif
785 }
786
787 // in case when failedPe dies, everybody go through its checkpoint table:
788 // destory all array elements
789 // recover lost buddies
790 // reconstruct all array elements from check point data
791 // called on all processors
792 void CkMemCheckPT::restart(int diePe)
793 {
794 #if CMK_MEM_CHECKPOINT
795   double curTime = CmiWallTimer();
796   if (CkMyPe() == diePe)
797     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
798   stage = (char*)"resetLB";
799   startTime = curTime;
800   if (CkMyPe() == diePe)
801     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
802
803 #if CK_NO_PROC_POOL
804   failed(diePe);        // add into the list of failed pes
805 #endif
806   thisFailedPe = diePe;
807
808   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
809
810   inRestarting = 1;
811                                                                                 
812   // disable load balancer's barrier
813   if (CkMyPe() != diePe) resetLB(diePe);
814
815   CKLOCMGR_LOOP(mgr->startInserting(););
816
817   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
818   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
819 /*
820   if (CkMyPe() == 0)
821     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
822 */
823 #endif
824 }
825
826 // loally remove all array elements
827 void CkMemCheckPT::removeArrayElements()
828 {
829 #if CMK_MEM_CHECKPOINT
830   int len = ckTable.length();
831   double curTime = CmiWallTimer();
832   if (CkMyPe() == thisFailedPe) 
833     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
834   stage = (char*)"removeArrayElements";
835   startTime = curTime;
836
837   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
838   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
839
840   // get rid of all buffering and remote recs
841   // including destorying all array elements
842   CKLOCMGR_LOOP(mgr->flushAllRecs(););
843
844 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
845
846   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
847   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
848 #endif
849 }
850
851 // flush state in reduction manager
852 void CkMemCheckPT::resetReductionMgr()
853 {
854   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
855   int numGroups = CkpvAccess(_groupIDTable)->size();
856   for(int i=0;i<numGroups;i++) {
857     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
858     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
859     obj->flushStates();
860     obj->ckJustMigrated();
861   }
862   // reset again
863   //CpvAccess(_qd)->flushStates();
864
865 #if 1
866   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
867   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
868 #else
869   if (CkMyPe() == 0)
870     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
871 #endif
872 }
873
874 // recover the lost buddies
875 void CkMemCheckPT::recoverBuddies()
876 {
877   int idx;
878   int len = ckTable.length();
879   // ready to flush reduction manager
880   // cannot be CkMemCheckPT::restart because destory will modify states
881   double curTime = CmiWallTimer();
882   if (CkMyPe() == thisFailedPe)
883   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
884   stage = (char *)"recoverBuddies";
885   if (CkMyPe() == thisFailedPe)
886   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
887   startTime = curTime;
888
889   // recover buddies
890   expectCount = 0;
891 #if !CMK_CHKP_ALL
892   for (idx=0; idx<len; idx++) {
893     CkCheckPTInfo *entry = ckTable[idx];
894     if (entry->pNo == thisFailedPe) {
895 #if CK_NO_PROC_POOL
896       // find a new buddy
897       int budPe = BuddyPE(CkMyPe());
898 #else
899       int budPe = thisFailedPe;
900 #endif
901       CkArrayCheckPTMessage *msg = entry->getCopy();
902       msg->bud1 = budPe;
903       msg->bud2 = CkMyPe();
904       msg->cp_flag = 0;            // not checkpointing
905       thisProxy[budPe].recoverEntry(msg);
906       expectCount ++;
907     }
908   }
909 #else
910   //send to failed pe
911   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
912 #if CK_NO_PROC_POOL
913       // find a new buddy
914       int budPe = BuddyPE(CkMyPe());
915 #else
916       int budPe = thisFailedPe;
917 #endif
918       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
919       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
920           msg->cp_flag = 0;            // not checkpointing
921       msg->bud1 = budPe;
922       msg->bud2 = CkMyPe();
923       thisProxy[budPe].recoverEntry(msg);
924       expectCount ++;
925   }
926 #endif
927
928 #if 1
929   if (expectCount == 0) {
930     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
931     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
932   }
933 #else
934   if (CkMyPe() == 0) {
935     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
936   }
937 #endif
938
939   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
940 }
941
942 void CkMemCheckPT::gotData()
943 {
944   ackCount ++;
945   if (ackCount == expectCount) {
946     ackCount = 0;
947     expectCount = -1;
948     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
949     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
950   }
951 }
952
953 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
954 {
955   for (int i=0; i<n; i++) {
956     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
957     mgr->updateLocation(idx[i], nowOnPe);
958   }
959 }
960
961 // restore array elements
962 void CkMemCheckPT::recoverArrayElements()
963 {
964   double curTime = CmiWallTimer();
965   int len = ckTable.length();
966   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
967   stage = (char *)"recoverArrayElements";
968   if (CkMyPe() == thisFailedPe)
969   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
970   startTime = curTime;
971
972   // recover all array elements
973   int count = 0;
974 #if STREAMING_INFORMHOME
975   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
976   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
977 #endif
978 #if !CMK_CHKP_ALL
979   for (int idx=0; idx<len; idx++)
980   {
981     CkCheckPTInfo *entry = ckTable[idx];
982 #if CK_NO_PROC_POOL
983     // the bigger one will do 
984 //    if (CkMyPe() < entry->pNo) continue;
985     if (!isMaster(entry->pNo)) continue;
986 #else
987     // smaller one do it, which has the original object
988     if (CkMyPe() == entry->pNo+1 || 
989         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
990 #endif
991 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
992
993     entry->updateBuddy(CkMyPe(), entry->pNo);
994     CkArrayCheckPTMessage *msg = entry->getCopy();
995     // gzheng
996     //thisProxy[CkMyPe()].inmem_restore(msg);
997     inmem_restore(msg);
998 #if STREAMING_INFORMHOME
999     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1000     int homePe = mgr->homePe(msg->index);
1001     if (homePe != CkMyPe()) {
1002       gmap[homePe].push_back(msg->locMgr);
1003       imap[homePe].push_back(msg->index);
1004     }
1005 #endif
1006     CkFreeMsg(msg);
1007     count ++;
1008   }
1009 #else
1010         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1011         recoverAll(msg,gmap,imap);
1012     CkFreeMsg(msg);
1013 #endif
1014 #if STREAMING_INFORMHOME
1015   for (int i=0; i<CkNumPes(); i++) {
1016     if (gmap[i].size() && i!=CkMyPe()) {
1017       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1018     }
1019   }
1020   delete [] imap;
1021   delete [] gmap;
1022 #endif
1023   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1024
1025   CKLOCMGR_LOOP(mgr->doneInserting(););
1026
1027   inRestarting = 0;
1028   // _crashedNode = -1;
1029   CpvAccess(_crashedNode) = -1;
1030
1031   if (CkMyPe() == 0)
1032     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1033 }
1034
1035 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1036 #if CMK_CHKP_ALL
1037         PUP::fromMem p(msg->packData);
1038         int numElements;
1039         p|numElements;
1040         if(p.isUnpacking()){
1041                 for(int i=0;i<numElements;i++){
1042                         CkGroupID gID;
1043                         CkArrayIndex idx;
1044                         p|gID;
1045                         p|idx;
1046                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1047 #if STREAMING_INFORMHOME
1048                         mgr->resume(idx,p,CmiFalse);
1049 #else
1050                         mgr->resume(idx,p,CmiTrue);
1051 #endif
1052                           /*CkLocRec_local *rec = loc.getLocalRecord();
1053                           CmiAssert(rec);
1054                           CkVec<CkMigratable *> list;
1055                           mgr->migratableList(rec, list);
1056                           CmiAssert(list.length() > 0);
1057                           for (int l=0; l<list.length(); l++) {
1058                                 ArrayElement * elt = (ArrayElement *)list[l];
1059                                 //    reset, may not needed now
1060                                 // for now.
1061                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1062                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1063                                   if (c) c->redNo = 0;
1064                                 }
1065                           }*/
1066 #if STREAMING_INFORMHOME
1067                         int homePe = mgr->homePe(idx);
1068                         if (homePe != CkMyPe()) {
1069                           gmap[homePe].push_back(gID);
1070                           imap[homePe].push_back(idx);
1071                         }
1072 #endif
1073                 }
1074         }
1075 #endif
1076 }
1077
1078 static double restartT;
1079
1080 // on every processor
1081 // turn load balancer back on
1082 void CkMemCheckPT::finishUp()
1083 {
1084   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1085   //CKLOCMGR_LOOP(mgr->doneInserting(););
1086   
1087   if (CkMyPe() == thisFailedPe)
1088   {
1089        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1090        //CkStartQD(cpCallback);
1091        cpCallback.send();
1092        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1093   }
1094
1095 #if CK_NO_PROC_POOL
1096 #if NODE_CHECKPOINT
1097   int numnodes = CmiNumPhysicalNodes();
1098 #else
1099   int numnodes = CkNumPes();
1100 #endif
1101   if (numnodes-totalFailed() <=2) {
1102     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1103     _memChkptOn = 0;
1104   }
1105 #endif
1106 }
1107
1108 // called only on 0
1109 void CkMemCheckPT::quiescence(CkCallback &cb)
1110 {
1111   static int pe_count = 0;
1112   pe_count ++;
1113   CmiAssert(CkMyPe() == 0);
1114   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1115   if (pe_count == CkNumPes()) {
1116     pe_count = 0;
1117     cb.send();
1118   }
1119 }
1120
1121 // User callable function - to start a checkpoint
1122 // callback cb is used to pass control back
1123 void CkStartMemCheckpoint(CkCallback &cb)
1124 {
1125 #if CMK_MEM_CHECKPOINT
1126   if (_memChkptOn == 0) {
1127     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1128     cb.send();
1129     return;
1130   }
1131   if (CkInRestarting()) {
1132       // trying to checkpointing during restart
1133     cb.send();
1134     return;
1135   }
1136     // store user callback and user data
1137   CkMemCheckPT::cpCallback = cb;
1138
1139     // broadcast to start check pointing
1140   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1141   checkptMgr.doItNow(CkMyPe(), cb);
1142 #else
1143   // when mem checkpoint is disabled, invike cb immediately
1144   cb.send();
1145 #endif
1146 }
1147
1148 void CkRestartCheckPoint(int diePe)
1149 {
1150 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1151   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1152   // broadcast
1153   checkptMgr.restart(diePe);
1154 }
1155
1156 static int _diePE = -1;
1157
1158 // callback function used locally by ccs handler
1159 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1160 {
1161 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1162   CkRestartCheckPoint(_diePE);
1163 }
1164
1165 // Converse function handles
1166 static int askPhaseHandlerIdx;
1167 static int recvPhaseHandlerIdx;
1168 static int askProcDataHandlerIdx;
1169 static int restartBcastHandlerIdx;
1170 static int recoverProcDataHandlerIdx;
1171 static int restartBeginHandlerIdx;
1172 static int notifyHandlerIdx;
1173
1174 // called on crashed PE
1175 static void restartBeginHandler(char *msg)
1176 {
1177 #if CMK_MEM_CHECKPOINT
1178   CmiFree(msg);
1179   static int count = 0;
1180   CmiAssert(CkMyPe() == _diePE);
1181   count ++;
1182   if (count == CkNumPes()) {
1183     CkRestartCheckPointCallback(NULL, NULL);
1184     count = 0;
1185   }
1186 #endif
1187 }
1188
1189 extern void _discard_charm_message();
1190 extern void _resume_charm_message();
1191
1192 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1193         return data;
1194 }
1195
1196 static void restartBcastHandler(char *msg)
1197 {
1198 #if CMK_MEM_CHECKPOINT
1199   // advance phase counter
1200   CkMemCheckPT::inRestarting = 1;
1201   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1202   // gzheng
1203   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1204
1205   if (CkMyPe()==_diePE)
1206     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1207
1208   // reset QD counters
1209 /*  gzheng
1210   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1211 */
1212
1213 /*  gzheng
1214   if (CkMyPe()==_diePE)
1215       CkRestartCheckPointCallback(NULL, NULL);
1216 */
1217   CmiFree(msg);
1218
1219   _resume_charm_message();
1220
1221     // reduction
1222   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1223   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1224   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1225   checkpointed = 0;
1226 #endif
1227 }
1228
1229 extern void _initDone();
1230
1231 // called on crashed processor
1232 static void recoverProcDataHandler(char *msg)
1233 {
1234 #if CMK_MEM_CHECKPOINT
1235    int i;
1236    envelope *env = (envelope *)msg;
1237    CkUnpackMessage(&env);
1238    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1239    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1240    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1241    //cur_restart_phase ++;
1242      // gzheng ?
1243    //CpvAccess(_qd)->flushStates();
1244
1245    // restore readonly, mainchare, group, nodegroup
1246 //   int temp = cur_restart_phase;
1247 //   cur_restart_phase = -1;
1248    PUP::fromMem p(procMsg->packData);
1249    _handleProcData(p);
1250
1251    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1252    // gzheng
1253    CKLOCMGR_LOOP(mgr->startInserting(););
1254
1255    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1256    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1257    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1258    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1259
1260    _initDone();
1261 //   CpvAccess(_qd)->flushStates();
1262    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1263 #endif
1264 }
1265
1266 // called on its backup processor
1267 // get backup message buffer and sent to crashed processor
1268 static void askProcDataHandler(char *msg)
1269 {
1270 #if CMK_MEM_CHECKPOINT
1271     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1272     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1273     if (CpvAccess(procChkptBuf) == NULL)  {
1274       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1275       CkAbort("no checkpoint found");
1276     }
1277     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1278     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1279
1280     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1281
1282     CkPackMessage(&env);
1283     CmiSetHandler(env, recoverProcDataHandlerIdx);
1284     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1285     CpvAccess(procChkptBuf) = NULL;
1286 #endif
1287 }
1288
1289 // called on PE 0
1290 void qd_callback(void *m)
1291 {
1292    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1293    CkFreeMsg(m);
1294 #ifdef CMK_SMP
1295    for(int i=0;i<CmiMyNodeSize();i++){
1296    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1297    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1298         CmiSetHandler(msg, askProcDataHandlerIdx);
1299         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1300         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1301    }
1302    return;
1303 #endif
1304    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1305    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1306    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1307    CmiSetHandler(msg, askProcDataHandlerIdx);
1308    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1309    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1310
1311 }
1312
1313 // on crashed node
1314 void CkMemRestart(const char *dummy, CkArgMsg *args)
1315 {
1316 #if CMK_MEM_CHECKPOINT
1317    _diePE = CmiMyNode();
1318    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1319    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1320    CkMemCheckPT::inRestarting = 1;
1321
1322   CpvAccess( _crashedNode )= CmiMyNode();
1323         
1324   _discard_charm_message();
1325   
1326   if(CmiMyRank()==0){
1327     CkCallback cb(qd_callback);
1328     CkStartQD(cb);
1329     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1330   }
1331 #else
1332    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1333 #endif
1334 }
1335
1336 // can be called in other files
1337 // return true if it is in restarting
1338 extern "C"
1339 int CkInRestarting()
1340 {
1341 #if CMK_MEM_CHECKPOINT
1342   if (CpvAccess( _crashedNode)!=-1) return 1;
1343   // gzheng
1344   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1345   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1346   return CkMemCheckPT::inRestarting;
1347 #else
1348   return 0;
1349 #endif
1350 }
1351
1352 /*****************************************************************************
1353                 module initialization
1354 *****************************************************************************/
1355
1356 static int arg_where = CkCheckPoint_inMEM;
1357
1358 #if CMK_MEM_CHECKPOINT
1359 void init_memcheckpt(char **argv)
1360 {
1361     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1362       arg_where = CkCheckPoint_inDISK;
1363     }
1364
1365         // initiliazing _crashedNode variable
1366         CpvInitialize(int, _crashedNode);
1367         CpvAccess(_crashedNode) = -1;
1368
1369 }
1370 #endif
1371
1372 class CkMemCheckPTInit: public Chare {
1373 public:
1374   CkMemCheckPTInit(CkArgMsg *m) {
1375 #if CMK_MEM_CHECKPOINT
1376     if (arg_where == CkCheckPoint_inDISK) {
1377       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1378     }
1379     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1380     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1381 #endif
1382   }
1383 };
1384
1385 static void notifyHandler(char *msg)
1386 {
1387 #if CMK_MEM_CHECKPOINT
1388   CmiFree(msg);
1389       /* immediately increase restart phase to filter old messages */
1390   CpvAccess(_curRestartPhase) ++;
1391   CpvAccess(_qd)->flushStates();
1392   _discard_charm_message();
1393 #endif
1394 }
1395
1396 extern "C"
1397 void notify_crash(int node)
1398 {
1399 #ifdef CMK_MEM_CHECKPOINT
1400   CpvAccess( _crashedNode) = node;
1401 #ifdef CMK_SMP
1402   for(int i=0;i<CkMyNodeSize();i++){
1403         CpvAccessOther(_crashedNode,i)=node;
1404   }
1405 #endif
1406   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1407   CkMemCheckPT::inRestarting = 1;
1408
1409     // this may be in interrupt handler, send a message to reset QD
1410   int pe = CmiNodeFirst(CkMyNode());
1411   for(int i=0;i<CkMyNodeSize();i++){
1412         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1413         CmiSetHandler(msg, notifyHandlerIdx);
1414         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1415   }
1416 #endif
1417 }
1418
1419 extern "C" void (*notify_crash_fn)(int node);
1420
1421 #if CMK_CONVERSE_MPI
1422 static int pingHandlerIdx;
1423 static int pingCheckHandlerIdx;
1424 static int buddyDieHandlerIdx;
1425 static double lastPingTime = -1;
1426
1427 extern "C" void mpi_restart_crashed(int pe, int rank);
1428 extern "C" int  find_spare_mpirank(int pe);
1429
1430 void pingBuddy();
1431 void pingCheckHandler();
1432
1433 void buddyDieHandler(char *msg)
1434 {
1435 #if CMK_MEM_CHECKPOINT
1436    // notify
1437    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1438    notify_crash(diepe);
1439    // send message to crash pe to let it restart
1440    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1441    int newrank = find_spare_mpirank(diepe);
1442    int buddy = obj->BuddyPE(CmiMyPe());
1443    if (buddy == diepe)  {
1444      mpi_restart_crashed(diepe, newrank);
1445      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1446    }
1447 #endif
1448 }
1449
1450 void pingHandler(void *msg)
1451 {
1452   lastPingTime = CmiWallTimer();
1453   CmiFree(msg);
1454 }
1455
1456 void pingCheckHandler()
1457 {
1458 #if CMK_MEM_CHECKPOINT
1459   double now = CmiWallTimer();
1460   if (lastPingTime > 0 && now - lastPingTime > 4) {
1461     int i, pe, buddy;
1462     // tell everyone the buddy dies
1463     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1464     for (i = 1; i < CmiNumPes(); i++) {
1465        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1466        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1467     }
1468     buddy = pe;
1469     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1470     for (int pe = 0; pe < CmiNumPes(); pe++) {
1471       if (obj->isFailed(pe) || pe == buddy) continue;
1472       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1473       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1474       CmiSetHandler(msg, buddyDieHandlerIdx);
1475       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1476     }
1477   }
1478   else 
1479     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1480 #endif
1481 }
1482
1483 void pingBuddy()
1484 {
1485 #if CMK_MEM_CHECKPOINT
1486   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1487   if (obj) {
1488     int buddy = obj->BuddyPE(CkMyPe());
1489 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1490     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1491     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1492     CmiSetHandler(msg, pingHandlerIdx);
1493     CmiGetRestartPhase(msg) = 9999;
1494     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1495   }
1496   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1497 #endif
1498 }
1499 #endif
1500
1501 // initproc
1502 void CkRegisterRestartHandler( )
1503 {
1504 #if CMK_MEM_CHECKPOINT
1505   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1506   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1507   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1508   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1509   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1510
1511 #if CMK_CONVERSE_MPI
1512   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1513   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1514   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1515 #endif
1516
1517   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1518   CpvAccess(procChkptBuf) = NULL;
1519
1520   notify_crash_fn = notify_crash;
1521
1522 #if ! CMK_CONVERSE_MPI
1523   // print pid to kill
1524 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1525 //  sleep(4);
1526 #endif
1527 #endif
1528 }
1529
1530
1531 extern "C"
1532 int CkHasCheckpoints()
1533 {
1534   return checkpointed;
1535 }
1536
1537 /// @todo: the following definitions should be moved to a separate file containing
1538 // structures and functions about fault tolerance strategies
1539
1540 /**
1541  *  * @brief: function for killing a process                                             
1542  *   */
1543 #ifdef CMK_MEM_CHECKPOINT
1544 #if CMK_HAS_GETPID
1545 void killLocal(void *_dummy,double curWallTime){
1546         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1547         if(CmiWallTimer()<killTime-1){
1548                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1549         }else{ 
1550 #if CMK_CONVERSE_MPI
1551                                 CkDieNow();
1552 #else 
1553                 kill(getpid(),SIGKILL);                                               
1554 #endif
1555         }              
1556
1557 #else
1558 void killLocal(void *_dummy,double curWallTime){
1559   CmiAbort("kill() not supported!");
1560 }
1561 #endif
1562 #endif
1563
1564 #ifdef CMK_MEM_CHECKPOINT
1565 /**
1566  * @brief: reads the file with the kill information
1567  */
1568 void readKillFile(){
1569         FILE *fp=fopen(killFile,"r");
1570         if(!fp){
1571                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1572                 return;
1573         }
1574         int proc;
1575         double sec;
1576         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1577                 if(proc == CkMyNode() && CkMyRank() == 0){
1578                         killTime = CmiWallTimer()+sec;
1579                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1580                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1581                 }
1582         }
1583         fclose(fp);
1584 }
1585
1586 #if ! CMK_CONVERSE_MPI
1587 void CkDieNow()
1588 {
1589          // ignored for non-mpi version
1590         CmiPrintf("[%d] die now.\n", CmiMyPe());
1591         killTime = CmiWallTimer()+0.001;
1592         CcdCallFnAfter(killLocal,NULL,1);
1593 }
1594 #endif
1595
1596 #endif
1597
1598 #include "CkMemCheckpoint.def.h"
1599
1600