do not detect the failure on node 0 when load balancing is undergoing
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 int CkMemCheckPT::inLoadbalancing = 0;
75 double CkMemCheckPT::startTime;
76 char *CkMemCheckPT::stage;
77 CkCallback CkMemCheckPT::cpCallback;
78
79 int _memChkptOn = 1;                    // checkpoint is on or off
80
81 CkGroupID ckCheckPTGroupID;             // readonly
82
83 static int checkpointed = 0;
84
85 /// @todo the following declarations should be moved into a separate file for all 
86 // fault tolerant strategies
87
88 #ifdef CMK_MEM_CHECKPOINT
89 // name of the kill file that contains processes to be killed 
90 char *killFile;                                               
91 // flag for the kill file         
92 int killFlag=0;
93 // variable for storing the killing time
94 double killTime=0.0;
95 #endif
96
97 #ifdef CKLOCMGR_LOOP
98 #undef CKLOCMGR_LOOP
99 #endif
100 // loop over all CkLocMgr and do "code"
101 #define  CKLOCMGR_LOOP(code)    {       \
102   int numGroups = CkpvAccess(_groupIDTable)->size();    \
103   for(int i=0;i<numGroups;i++) {        \
104     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
105     if(obj->isLocMgr())  {      \
106       CkLocMgr *mgr = (CkLocMgr*)obj;   \
107       code      \
108     }   \
109   }     \
110  }
111
112 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
113 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
114
115 // compute the backup processor
116 // FIXME: avoid crashed processors
117 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
118
119 inline int CkMemCheckPT::BuddyPE(int pe)
120 {
121   int budpe;
122 #if NODE_CHECKPOINT
123     // buddy is the processor with same rank on the next physical node
124   int r1 = CmiPhysicalRank(pe);
125   int budnode = CmiPhysicalNodeID(pe);
126   do {
127     budnode = (budnode+1)%CmiNumPhysicalNodes();
128     int *pelist;
129     int num;
130     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
131     budpe = pelist[r1 % num];
132   } while (isFailed(budpe));
133   if (budpe == pe) {
134     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
135     CmiAbort("Failed to find a buddy processor");
136   }
137 #else
138   budpe = pe;
139   while (budpe == pe || isFailed(budpe)) 
140           budpe = (budpe+1)%CkNumPes();
141 #endif
142   return budpe;
143 }
144
145 // called in array element constructor
146 // choose and register with 2 buddies for checkpoiting 
147 #if CMK_MEM_CHECKPOINT
148 void ArrayElement::init_checkpt() {
149         if (_memChkptOn == 0) return;
150         if (CkInRestarting()) {
151           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
152         }
153         // only master init checkpoint
154         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
155
156         budPEs[0] = CkMyPe();
157         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
158         CmiAssert(budPEs[0] != budPEs[1]);
159         // inform checkPTMgr
160         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
161         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
162         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
163         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
164 }
165 #endif
166
167 // entry function invoked by checkpoint mgr asking for checkpoint data
168 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
169 #if CMK_MEM_CHECKPOINT
170 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
171 //char index[128];   thisIndexMax.sprint(index);
172 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
173   CkLocMgr *locMgr = thisArray->getLocMgr();
174   CmiAssert(myRec!=NULL);
175   int size;
176   {
177         PUP::sizer p;
178         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
179         size = p.size();
180   }
181   int packSize = size/sizeof(double) +1;
182   CkArrayCheckPTMessage *msg =
183                  new (packSize, 0) CkArrayCheckPTMessage;
184   msg->len = size;
185   msg->index =thisIndexMax;
186   msg->aid = thisArrayID;
187   msg->locMgr = locMgr->getGroupID();
188   msg->cp_flag = 1;
189   {
190         PUP::toMem p(msg->packData);
191         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
192   }
193
194   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
195   checkptMgr.recvData(msg, 2, budPEs);
196   delete m;
197 #endif
198 }
199
200 // checkpoint holder class - for memory checkpointing
201 class CkMemCheckPTInfo: public CkCheckPTInfo
202 {
203   CkArrayCheckPTMessage *ckBuffer;
204 public:
205   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
206                     CkCheckPTInfo(a, loc, idx, pno)
207   {
208     ckBuffer = NULL;
209   }
210   ~CkMemCheckPTInfo() 
211   {
212     if (ckBuffer) delete ckBuffer; 
213   }
214   inline void updateBuffer(CkArrayCheckPTMessage *data) 
215   {
216     CmiAssert(data!=NULL);
217     if (ckBuffer) delete ckBuffer;
218     ckBuffer = data;
219   }    
220   inline CkArrayCheckPTMessage * getCopy()
221   {
222     if (ckBuffer == NULL) {
223       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
224       CmiAbort("Abort!");
225     }
226     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
227   }     
228   inline void updateBuddy(int b1, int b2) {
229      CmiAssert(ckBuffer);
230      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
231      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
232      CmiAssert(pNo != CkMyPe());
233   }
234   inline int getSize() { 
235      CmiAssert(ckBuffer);
236      return ckBuffer->len; 
237   }
238 };
239
240 // checkpoint holder class - for in-disk checkpointing
241 class CkDiskCheckPTInfo: public CkCheckPTInfo 
242 {
243   char *fname;
244   int bud1, bud2;
245   int len;                      // checkpoint size
246 public:
247   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
248   {
249 #if CMK_USE_MKSTEMP
250     fname = new char[64];
251     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
252     mkstemp(fname);
253 #else
254     fname=tmpnam(NULL);
255 #endif
256     bud1 = bud2 = -1;
257     len = 0;
258   }
259   ~CkDiskCheckPTInfo() 
260   {
261     remove(fname);
262   }
263   inline void updateBuffer(CkArrayCheckPTMessage *data) 
264   {
265     double t = CmiWallTimer();
266     // unpack it
267     envelope *env = UsrToEnv(data);
268     CkUnpackMessage(&env);
269     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
270     FILE *f = fopen(fname,"wb");
271     PUP::toDisk p(f);
272     CkPupMessage(p, (void **)&data);
273     // delay sync to the end because otherwise the messages are blocked
274 //    fsync(fileno(f));
275     fclose(f);
276     bud1 = data->bud1;
277     bud2 = data->bud2;
278     len = data->len;
279     delete data;
280     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
281   }
282   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
283   {
284     CkArrayCheckPTMessage *data;
285     FILE *f = fopen(fname,"rb");
286     PUP::fromDisk p(f);
287     CkPupMessage(p, (void **)&data);
288     fclose(f);
289     data->bud1 = bud1;                          // update the buddies
290     data->bud2 = bud2;
291     return data;
292   }
293   inline void updateBuddy(int b1, int b2) {
294      bud1 = b1; bud2 = b2;
295      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
296      CmiAssert(pNo != CkMyPe());
297   }
298   inline int getSize() { 
299      return len; 
300   }
301 };
302
303 CkMemCheckPT::CkMemCheckPT(int w)
304 {
305   int numnodes = 0;
306 #if NODE_CHECKPOINT
307   numnodes = CmiNumPhysicalNodes();
308 #else
309   numnodes = CkNumPes();
310 #endif
311 #if CK_NO_PROC_POOL
312   if (numnodes <= 2)
313 #else
314   if (numnodes  == 1)
315 #endif
316   {
317     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
318     _memChkptOn = 0;
319   }
320   inRestarting = 0;
321   recvCount = peCount = 0;
322   ackCount = 0;
323   expectCount = -1;
324   where = w;
325
326 #if CMK_CONVERSE_MPI
327   void pingBuddy();
328   void pingCheckHandler();
329   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
330   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
331 #endif
332 }
333
334 CkMemCheckPT::~CkMemCheckPT()
335 {
336   int len = ckTable.length();
337   for (int i=0; i<len; i++) {
338     delete ckTable[i];
339   }
340 }
341
342 void CkMemCheckPT::pup(PUP::er& p) 
343
344   CBase_CkMemCheckPT::pup(p); 
345   p|cpStarter;
346   p|thisFailedPe;
347   p|failedPes;
348   p|ckCheckPTGroupID;           // recover global variable
349   p|cpCallback;                 // store callback
350   p|where;                      // where to checkpoint
351   p|peCount;
352   if (p.isUnpacking()) {
353     recvCount = 0;
354 #if CMK_CONVERSE_MPI
355     void pingBuddy();
356     void pingCheckHandler();
357     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
358     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
359 #endif
360   }
361 }
362
363 // called by checkpoint mgr to restore an array element
364 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
365 {
366 #if CMK_MEM_CHECKPOINT
367   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
368   // m->index.print();
369   PUP::fromMem p(m->packData);
370   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
371   CmiAssert(mgr);
372 #if STREAMING_INFORMHOME
373   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
374 #else
375   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
376 #endif
377
378   // find a list of array elements bound together
379   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
380   CmiAssert(elt);
381   CkLocRec_local *rec = elt->myRec;
382   CkVec<CkMigratable *> list;
383   mgr->migratableList(rec, list);
384   CmiAssert(list.length() > 0);
385   for (int l=0; l<list.length(); l++) {
386     elt = (ArrayElement *)list[l];
387     elt->budPEs[0] = m->bud1;
388     elt->budPEs[1] = m->bud2;
389     //    reset, may not needed now
390     // for now.
391     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
392       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
393       if (c) c->redNo = 0;
394     }
395   }
396 #endif
397 }
398
399 // return 1 if pe is a crashed processor
400 int CkMemCheckPT::isFailed(int pe)
401 {
402   for (int i=0; i<failedPes.length(); i++)
403     if (failedPes[i] == pe) return 1;
404   return 0;
405 }
406
407 // add pe into history list of all failed processors
408 void CkMemCheckPT::failed(int pe)
409 {
410   if (isFailed(pe)) return;
411   failedPes.push_back(pe);
412 }
413
414 int CkMemCheckPT::totalFailed()
415 {
416   return failedPes.length();
417 }
418
419 // create an checkpoint entry for array element of aid with index.
420 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
421 {
422   // error check, no duplicate
423   int idx, len = ckTable.size();
424   for (idx=0; idx<len; idx++) {
425     CkCheckPTInfo *entry = ckTable[idx];
426     if (index == entry->index) {
427       if (loc == entry->locMgr) {
428           // bindTo array elements
429           return;
430       }
431         // for array inheritance, the following check may fail
432         // because ArrayElement constructor of all superclasses are called
433       if (aid == entry->aid) {
434         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
435         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
436       }
437     }
438   }
439   CkCheckPTInfo *newEntry;
440   if (where == CkCheckPoint_inMEM)
441     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
442   else
443     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
444   ckTable.push_back(newEntry);
445   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
446 }
447
448 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
449 {
450 #if !CMK_CHKP_ALL       
451   int buddy = msg->bud1;
452   if (buddy == CkMyPe()) buddy = msg->bud2;
453   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
454   recvData(msg);
455     // ack
456   thisProxy[buddy].gotData();
457 #else
458   recvArrayCheckpoint(msg);
459   thisProxy[msg->bud2].gotData();
460 #endif
461 }
462
463 // loop through my checkpoint table and ask checkpointed array elements
464 // to send me checkpoint data.
465 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
466 {
467   checkpointed = 1;
468   cpCallback = cb;
469   cpStarter = starter;
470   if (CkMyPe() == cpStarter) {
471     startTime = CmiWallTimer();
472     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
473   }
474 #if !CMK_CHKP_ALL
475   int len = ckTable.length();
476   for (int i=0; i<len; i++) {
477     CkCheckPTInfo *entry = ckTable[i];
478       // always let the bigger number processor send request
479     //if (CkMyPe() < entry->pNo) continue;
480       // always let the smaller number processor send request, may on same proc
481     if (!isMaster(entry->pNo)) continue;
482       // call inmem_checkpoint to the array element, ask it to send
483       // back checkpoint data via recvData().
484     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
485     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
486   }
487     // if my table is empty, then I am done
488   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
489 #else
490   startArrayCheckpoint();
491 #endif
492   // pack and send proc level data
493   sendProcData();
494 }
495
496 class MemElementPacker : public CkLocIterator{
497         private:
498                 CkLocMgr *locMgr;
499                 PUP::er &p;
500         public:
501                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
502                 void addLocation(CkLocation &loc){
503                         CkArrayIndexMax idx = loc.getIndex();
504                         CkGroupID gID = locMgr->ckGetGroupID();
505                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
506                         CmiAssert(elt);
507                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
508                         p|gID;
509                         p|idx;
510                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
511                 }
512 };
513
514 void CkMemCheckPT::pupAllElements(PUP::er &p){
515 #if CMK_CHKP_ALL
516         int numElements;
517         if(!p.isUnpacking()){
518                 numElements = CkCountArrayElements();
519         }
520         p | numElements;
521         if(!p.isUnpacking()){
522                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
523         }
524 #endif
525 }
526
527 void CkMemCheckPT::startArrayCheckpoint(){
528 #if CMK_CHKP_ALL
529         int size;
530         {
531                 PUP::sizer psizer;
532                 pupAllElements(psizer);
533                 size = psizer.size();
534         }
535         int packSize = size/sizeof(double)+1;
536         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
537         msg->len = size;
538         msg->cp_flag = 1;
539         int budPEs[2];
540         msg->bud1=CkMyPe();
541         msg->bud2=ChkptOnPe(CkMyPe());
542         {
543                 PUP::toMem p(msg->packData);
544                 pupAllElements(p);
545         }
546         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
547         chkpTable[0] = msg;
548         recvCount++;
549 #endif
550 }
551
552 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
553 {
554 #if CMK_CHKP_ALL
555         int idx = 1;
556         if(msg->bud1 == CkMyPe()){
557                 idx = 0;
558         }
559         int isChkpting = msg->cp_flag;
560         chkpTable[idx] = msg;
561         if(isChkpting){
562                 recvCount++;
563                 if(recvCount == 2){
564                   if (where == CkCheckPoint_inMEM) {
565                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
566                   }
567                   else if (where == CkCheckPoint_inDISK) {
568                         // another barrier for finalize the writing using fsync
569                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
570                         contribute(0,NULL,CkReduction::sum_int,localcb);
571                   }
572                   else
573                         CmiAbort("Unknown checkpoint scheme");
574                   recvCount = 0;
575                 }
576         }
577 #endif
578 }
579
580 // don't handle array elements
581 static inline void _handleProcData(PUP::er &p)
582 {
583     // save readonlys, and callback BTW
584     CkPupROData(p);
585
586     // save mainchares 
587     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
588         
589 #ifndef CMK_CHARE_USE_PTR
590     // save non-migratable chare
591     CkPupChareData(p);
592 #endif
593
594     // save groups into Groups.dat
595     CkPupGroupData(p);
596
597     // save nodegroups into NodeGroups.dat
598     if(CkMyRank()==0) CkPupNodeGroupData(p);
599 }
600
601 void CkMemCheckPT::sendProcData()
602 {
603   // find out size of buffer
604   int size;
605   {
606     PUP::sizer p;
607     _handleProcData(p);
608     size = p.size();
609   }
610   int packSize = size;
611   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
612   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
613   {
614     PUP::toMem p(msg->packData);
615     _handleProcData(p);
616   }
617   msg->pe = CkMyPe();
618   msg->len = size;
619   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
620   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
621 }
622
623 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
624 {
625   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
626   CpvAccess(procChkptBuf) = msg;
627   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
628   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
629 }
630
631 // ArrayElement call this function to give us the checkpointed data
632 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
633 {
634   int len = ckTable.length();
635   int idx;
636   for (idx=0; idx<len; idx++) {
637     CkCheckPTInfo *entry = ckTable[idx];
638     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
639   }
640   CkAssert(idx < len);
641   int isChkpting = msg->cp_flag;
642   ckTable[idx]->updateBuffer(msg);
643   if (isChkpting) {
644       // all my array elements have returned their inmem data
645       // inform starter processor that I am done.
646     recvCount ++;
647     if (recvCount == ckTable.length()) {
648       if (where == CkCheckPoint_inMEM) {
649         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
650       }
651       else if (where == CkCheckPoint_inDISK) {
652         // another barrier for finalize the writing using fsync
653         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
654         contribute(0,NULL,CkReduction::sum_int,localcb);
655       }
656       else
657         CmiAbort("Unknown checkpoint scheme");
658       recvCount = 0;
659     } 
660   }
661 }
662
663 // only used in disk checkpointing
664 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
665 {
666   delete m;
667 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
668   system("sync");
669 #endif
670   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
671 }
672
673 // only is called on cpStarter when checkpoint is done
674 void CkMemCheckPT::cpFinish()
675 {
676   CmiAssert(CkMyPe() == cpStarter);
677   peCount++;
678     // now that all processors have finished, activate callback
679   if (peCount == 2) {
680     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
681     cpCallback.send();
682     peCount = 0;
683 #if !CMK_CHKP_ALL
684     thisProxy.report();
685 #endif
686   }
687 }
688
689 // for debugging, report checkpoint info
690 void CkMemCheckPT::report()
691 {
692   int objsize = 0;
693   int len = ckTable.length();
694   for (int i=0; i<len; i++) {
695     CkCheckPTInfo *entry = ckTable[i];
696     CmiAssert(entry);
697     objsize += entry->getSize();
698   }
699   CmiAssert(CpvAccess(procChkptBuf));
700 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
701 }
702
703 /*****************************************************************************
704                         RESTART Procedure
705 *****************************************************************************/
706
707 // master processor of two buddies
708 inline int CkMemCheckPT::isMaster(int buddype)
709 {
710 #if 0
711   int mype = CkMyPe();
712 //CkPrintf("ismaster: %d %d\n", pe, mype);
713   if (CkNumPes() - totalFailed() == 2) {
714     return mype > buddype;
715   }
716   for (int i=1; i<CkNumPes(); i++) {
717     int me = (buddype+i)%CkNumPes();
718     if (isFailed(me)) continue;
719     if (me == mype) return 1;
720     else return 0;
721   }
722   return 0;
723 #else
724     // smaller one
725   int mype = CkMyPe();
726 //CkPrintf("ismaster: %d %d\n", pe, mype);
727   if (CkNumPes() - totalFailed() == 2) {
728     return mype < buddype;
729   }
730 #if NODE_CHECKPOINT
731   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
732   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
733 #else
734   for (int i=1; i<CkNumPes(); i++) {
735 #endif
736     int me = (mype+i)%CkNumPes();
737     if (isFailed(me)) continue;
738     if (me == buddype) return 1;
739     else return 0;
740   }
741   return 0;
742 #endif
743 }
744
745
746
747 #if 0
748 // helper class to pup all elements that belong to same ckLocMgr
749 class ElementDestoryer : public CkLocIterator {
750 private:
751         CkLocMgr *locMgr;
752 public:
753         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
754         void addLocation(CkLocation &loc) {
755                 CkArrayIndex idx=loc.getIndex();
756                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
757                 loc.destroy();
758         }
759 };
760 #endif
761
762 // restore the bitmap vector for LB
763 void CkMemCheckPT::resetLB(int diepe)
764 {
765 #if CMK_LBDB_ON
766   int i;
767   char *bitmap = new char[CkNumPes()];
768   // set processor available bitmap
769   get_avail_vector(bitmap);
770
771   for (i=0; i<failedPes.length(); i++)
772     bitmap[failedPes[i]] = 0; 
773   bitmap[diepe] = 0;
774
775 #if CK_NO_PROC_POOL
776   set_avail_vector(bitmap);
777 #endif
778
779   // if I am the crashed pe, rebuild my failedPEs array
780   if (CkMyNode() == diepe)
781     for (i=0; i<CkNumPes(); i++) 
782       if (bitmap[i]==0) failed(i);
783
784   delete [] bitmap;
785 #endif
786 }
787
788 // in case when failedPe dies, everybody go through its checkpoint table:
789 // destory all array elements
790 // recover lost buddies
791 // reconstruct all array elements from check point data
792 // called on all processors
793 void CkMemCheckPT::restart(int diePe)
794 {
795 #if CMK_MEM_CHECKPOINT
796   double curTime = CmiWallTimer();
797   if (CkMyPe() == diePe)
798     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
799   stage = (char*)"resetLB";
800   startTime = curTime;
801   if (CkMyPe() == diePe)
802     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
803
804 #if CK_NO_PROC_POOL
805   failed(diePe);        // add into the list of failed pes
806 #endif
807   thisFailedPe = diePe;
808
809   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
810
811   inRestarting = 1;
812                                                                                 
813   // disable load balancer's barrier
814   if (CkMyPe() != diePe) resetLB(diePe);
815
816   CKLOCMGR_LOOP(mgr->startInserting(););
817
818   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
819   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
820 /*
821   if (CkMyPe() == 0)
822     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
823 */
824 #endif
825 }
826
827 // loally remove all array elements
828 void CkMemCheckPT::removeArrayElements()
829 {
830 #if CMK_MEM_CHECKPOINT
831   int len = ckTable.length();
832   double curTime = CmiWallTimer();
833   if (CkMyPe() == thisFailedPe) 
834     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
835   stage = (char*)"removeArrayElements";
836   startTime = curTime;
837
838   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
839   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
840
841   // get rid of all buffering and remote recs
842   // including destorying all array elements
843   CKLOCMGR_LOOP(mgr->flushAllRecs(););
844
845 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
846
847   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
848   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
849 #endif
850 }
851
852 // flush state in reduction manager
853 void CkMemCheckPT::resetReductionMgr()
854 {
855   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
856   int numGroups = CkpvAccess(_groupIDTable)->size();
857   for(int i=0;i<numGroups;i++) {
858     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
859     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
860     obj->flushStates();
861     obj->ckJustMigrated();
862   }
863   // reset again
864   //CpvAccess(_qd)->flushStates();
865
866 #if 1
867   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
868   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
869 #else
870   if (CkMyPe() == 0)
871     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
872 #endif
873 }
874
875 // recover the lost buddies
876 void CkMemCheckPT::recoverBuddies()
877 {
878   int idx;
879   int len = ckTable.length();
880   // ready to flush reduction manager
881   // cannot be CkMemCheckPT::restart because destory will modify states
882   double curTime = CmiWallTimer();
883   if (CkMyPe() == thisFailedPe)
884   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
885   stage = (char *)"recoverBuddies";
886   if (CkMyPe() == thisFailedPe)
887   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
888   startTime = curTime;
889
890   // recover buddies
891   expectCount = 0;
892 #if !CMK_CHKP_ALL
893   for (idx=0; idx<len; idx++) {
894     CkCheckPTInfo *entry = ckTable[idx];
895     if (entry->pNo == thisFailedPe) {
896 #if CK_NO_PROC_POOL
897       // find a new buddy
898       int budPe = BuddyPE(CkMyPe());
899 #else
900       int budPe = thisFailedPe;
901 #endif
902       CkArrayCheckPTMessage *msg = entry->getCopy();
903       msg->bud1 = budPe;
904       msg->bud2 = CkMyPe();
905       msg->cp_flag = 0;            // not checkpointing
906       thisProxy[budPe].recoverEntry(msg);
907       expectCount ++;
908     }
909   }
910 #else
911   //send to failed pe
912   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
913 #if CK_NO_PROC_POOL
914       // find a new buddy
915       int budPe = BuddyPE(CkMyPe());
916 #else
917       int budPe = thisFailedPe;
918 #endif
919       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
920       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
921           msg->cp_flag = 0;            // not checkpointing
922       msg->bud1 = budPe;
923       msg->bud2 = CkMyPe();
924       thisProxy[budPe].recoverEntry(msg);
925       expectCount ++;
926   }
927 #endif
928
929 #if 1
930   if (expectCount == 0) {
931     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
932     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
933   }
934 #else
935   if (CkMyPe() == 0) {
936     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
937   }
938 #endif
939
940   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
941 }
942
943 void CkMemCheckPT::gotData()
944 {
945   ackCount ++;
946   if (ackCount == expectCount) {
947     ackCount = 0;
948     expectCount = -1;
949     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
950     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
951   }
952 }
953
954 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
955 {
956   for (int i=0; i<n; i++) {
957     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
958     mgr->updateLocation(idx[i], nowOnPe);
959   }
960 }
961
962 // restore array elements
963 void CkMemCheckPT::recoverArrayElements()
964 {
965   double curTime = CmiWallTimer();
966   int len = ckTable.length();
967   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
968   stage = (char *)"recoverArrayElements";
969   if (CkMyPe() == thisFailedPe)
970   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
971   startTime = curTime;
972
973   // recover all array elements
974   int count = 0;
975 #if STREAMING_INFORMHOME
976   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
977   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
978 #endif
979 #if !CMK_CHKP_ALL
980   for (int idx=0; idx<len; idx++)
981   {
982     CkCheckPTInfo *entry = ckTable[idx];
983 #if CK_NO_PROC_POOL
984     // the bigger one will do 
985 //    if (CkMyPe() < entry->pNo) continue;
986     if (!isMaster(entry->pNo)) continue;
987 #else
988     // smaller one do it, which has the original object
989     if (CkMyPe() == entry->pNo+1 || 
990         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
991 #endif
992 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
993
994     entry->updateBuddy(CkMyPe(), entry->pNo);
995     CkArrayCheckPTMessage *msg = entry->getCopy();
996     // gzheng
997     //thisProxy[CkMyPe()].inmem_restore(msg);
998     inmem_restore(msg);
999 #if STREAMING_INFORMHOME
1000     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1001     int homePe = mgr->homePe(msg->index);
1002     if (homePe != CkMyPe()) {
1003       gmap[homePe].push_back(msg->locMgr);
1004       imap[homePe].push_back(msg->index);
1005     }
1006 #endif
1007     CkFreeMsg(msg);
1008     count ++;
1009   }
1010 #else
1011         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1012         recoverAll(msg,gmap,imap);
1013     CkFreeMsg(msg);
1014 #endif
1015 #if STREAMING_INFORMHOME
1016   for (int i=0; i<CkNumPes(); i++) {
1017     if (gmap[i].size() && i!=CkMyPe()) {
1018       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1019     }
1020   }
1021   delete [] imap;
1022   delete [] gmap;
1023 #endif
1024   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1025
1026   CKLOCMGR_LOOP(mgr->doneInserting(););
1027
1028   inRestarting = 0;
1029   // _crashedNode = -1;
1030   CpvAccess(_crashedNode) = -1;
1031
1032   if (CkMyPe() == 0)
1033     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1034 }
1035
1036 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1037 #if CMK_CHKP_ALL
1038         PUP::fromMem p(msg->packData);
1039         int numElements;
1040         p|numElements;
1041         if(p.isUnpacking()){
1042                 for(int i=0;i<numElements;i++){
1043                         CkGroupID gID;
1044                         CkArrayIndex idx;
1045                         p|gID;
1046                         p|idx;
1047                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1048 #if STREAMING_INFORMHOME
1049                         mgr->resume(idx,p,CmiFalse);
1050 #else
1051                         mgr->resume(idx,p,CmiTrue);
1052 #endif
1053                           /*CkLocRec_local *rec = loc.getLocalRecord();
1054                           CmiAssert(rec);
1055                           CkVec<CkMigratable *> list;
1056                           mgr->migratableList(rec, list);
1057                           CmiAssert(list.length() > 0);
1058                           for (int l=0; l<list.length(); l++) {
1059                                 ArrayElement * elt = (ArrayElement *)list[l];
1060                                 //    reset, may not needed now
1061                                 // for now.
1062                                 for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
1063                                   contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
1064                                   if (c) c->redNo = 0;
1065                                 }
1066                           }*/
1067 #if STREAMING_INFORMHOME
1068                         int homePe = mgr->homePe(idx);
1069                         if (homePe != CkMyPe()) {
1070                           gmap[homePe].push_back(gID);
1071                           imap[homePe].push_back(idx);
1072                         }
1073 #endif
1074                 }
1075         }
1076 #endif
1077 }
1078
1079 static double restartT;
1080
1081 // on every processor
1082 // turn load balancer back on
1083 void CkMemCheckPT::finishUp()
1084 {
1085   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1086   //CKLOCMGR_LOOP(mgr->doneInserting(););
1087   
1088   if (CkMyPe() == thisFailedPe)
1089   {
1090        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1091        //CkStartQD(cpCallback);
1092        cpCallback.send();
1093        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1094   }
1095
1096 #if CK_NO_PROC_POOL
1097 #if NODE_CHECKPOINT
1098   int numnodes = CmiNumPhysicalNodes();
1099 #else
1100   int numnodes = CkNumPes();
1101 #endif
1102   if (numnodes-totalFailed() <=2) {
1103     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1104     _memChkptOn = 0;
1105   }
1106 #endif
1107 }
1108
1109 // called only on 0
1110 void CkMemCheckPT::quiescence(CkCallback &cb)
1111 {
1112   static int pe_count = 0;
1113   pe_count ++;
1114   CmiAssert(CkMyPe() == 0);
1115   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1116   if (pe_count == CkNumPes()) {
1117     pe_count = 0;
1118     cb.send();
1119   }
1120 }
1121
1122 // User callable function - to start a checkpoint
1123 // callback cb is used to pass control back
1124 void CkStartMemCheckpoint(CkCallback &cb)
1125 {
1126 #if CMK_MEM_CHECKPOINT
1127   if (_memChkptOn == 0) {
1128     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1129     cb.send();
1130     return;
1131   }
1132   if (CkInRestarting()) {
1133       // trying to checkpointing during restart
1134     cb.send();
1135     return;
1136   }
1137     // store user callback and user data
1138   CkMemCheckPT::cpCallback = cb;
1139
1140     // broadcast to start check pointing
1141   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1142   checkptMgr.doItNow(CkMyPe(), cb);
1143 #else
1144   // when mem checkpoint is disabled, invike cb immediately
1145   cb.send();
1146 #endif
1147 }
1148
1149 void CkRestartCheckPoint(int diePe)
1150 {
1151 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1152   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1153   // broadcast
1154   checkptMgr.restart(diePe);
1155 }
1156
1157 static int _diePE = -1;
1158
1159 // callback function used locally by ccs handler
1160 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1161 {
1162 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1163   CkRestartCheckPoint(_diePE);
1164 }
1165
1166 // Converse function handles
1167 static int askPhaseHandlerIdx;
1168 static int recvPhaseHandlerIdx;
1169 static int askProcDataHandlerIdx;
1170 static int restartBcastHandlerIdx;
1171 static int recoverProcDataHandlerIdx;
1172 static int restartBeginHandlerIdx;
1173 static int notifyHandlerIdx;
1174
1175 // called on crashed PE
1176 static void restartBeginHandler(char *msg)
1177 {
1178 #if CMK_MEM_CHECKPOINT
1179   CmiFree(msg);
1180   static int count = 0;
1181   CmiAssert(CkMyPe() == _diePE);
1182   count ++;
1183   if (count == CkNumPes()) {
1184     CkRestartCheckPointCallback(NULL, NULL);
1185     count = 0;
1186   }
1187 #endif
1188 }
1189
1190 extern void _discard_charm_message();
1191 extern void _resume_charm_message();
1192
1193 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1194         return data;
1195 }
1196
1197 static void restartBcastHandler(char *msg)
1198 {
1199 #if CMK_MEM_CHECKPOINT
1200   // advance phase counter
1201   CkMemCheckPT::inRestarting = 1;
1202   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1203   // gzheng
1204   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1205
1206   if (CkMyPe()==_diePE)
1207     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1208
1209   // reset QD counters
1210 /*  gzheng
1211   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1212 */
1213
1214 /*  gzheng
1215   if (CkMyPe()==_diePE)
1216       CkRestartCheckPointCallback(NULL, NULL);
1217 */
1218   CmiFree(msg);
1219
1220   _resume_charm_message();
1221
1222     // reduction
1223   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1224   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1225   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1226   checkpointed = 0;
1227 #endif
1228 }
1229
1230 extern void _initDone();
1231
1232 // called on crashed processor
1233 static void recoverProcDataHandler(char *msg)
1234 {
1235 #if CMK_MEM_CHECKPOINT
1236    int i;
1237    envelope *env = (envelope *)msg;
1238    CkUnpackMessage(&env);
1239    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1240    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1241    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1242    //cur_restart_phase ++;
1243      // gzheng ?
1244    //CpvAccess(_qd)->flushStates();
1245
1246    // restore readonly, mainchare, group, nodegroup
1247 //   int temp = cur_restart_phase;
1248 //   cur_restart_phase = -1;
1249    PUP::fromMem p(procMsg->packData);
1250    _handleProcData(p);
1251
1252    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1253    // gzheng
1254    CKLOCMGR_LOOP(mgr->startInserting(););
1255
1256    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1257    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1258    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1259    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1260
1261    _initDone();
1262 //   CpvAccess(_qd)->flushStates();
1263    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1264 #endif
1265 }
1266
1267 // called on its backup processor
1268 // get backup message buffer and sent to crashed processor
1269 static void askProcDataHandler(char *msg)
1270 {
1271 #if CMK_MEM_CHECKPOINT
1272     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1273     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1274     if (CpvAccess(procChkptBuf) == NULL)  {
1275       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1276       CkAbort("no checkpoint found");
1277     }
1278     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1279     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1280
1281     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1282
1283     CkPackMessage(&env);
1284     CmiSetHandler(env, recoverProcDataHandlerIdx);
1285     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1286     CpvAccess(procChkptBuf) = NULL;
1287 #endif
1288 }
1289
1290 // called on PE 0
1291 void qd_callback(void *m)
1292 {
1293    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1294    CkFreeMsg(m);
1295 #ifdef CMK_SMP
1296    for(int i=0;i<CmiMyNodeSize();i++){
1297    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1298    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1299         CmiSetHandler(msg, askProcDataHandlerIdx);
1300         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1301         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1302    }
1303    return;
1304 #endif
1305    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1306    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1307    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1308    CmiSetHandler(msg, askProcDataHandlerIdx);
1309    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1310    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1311
1312 }
1313
1314 // on crashed node
1315 void CkMemRestart(const char *dummy, CkArgMsg *args)
1316 {
1317 #if CMK_MEM_CHECKPOINT
1318    _diePE = CmiMyNode();
1319    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1320    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1321    CkMemCheckPT::inRestarting = 1;
1322
1323   CpvAccess( _crashedNode )= CmiMyNode();
1324         
1325   _discard_charm_message();
1326   
1327   if(CmiMyRank()==0){
1328     CkCallback cb(qd_callback);
1329     CkStartQD(cb);
1330     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1331   }
1332 #else
1333    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1334 #endif
1335 }
1336
1337 // can be called in other files
1338 // return true if it is in restarting
1339 extern "C"
1340 int CkInRestarting()
1341 {
1342 #if CMK_MEM_CHECKPOINT
1343   if (CpvAccess( _crashedNode)!=-1) return 1;
1344   // gzheng
1345   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1346   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1347   return CkMemCheckPT::inRestarting;
1348 #else
1349   return 0;
1350 #endif
1351 }
1352
1353 extern "C"
1354 void CkSetInLdb(){
1355 #if CMK_MEM_CHECKPOINT
1356         CkMemCheckPT::inLoadbalancing = 1;
1357 #endif
1358 }
1359
1360 extern "C"
1361 int CkInLdb(){
1362 #if CMK_MEM_CHECKPOINT
1363         return CkMemCheckPT::inLoadbalancing;
1364 #endif
1365 }
1366
1367 extern "C"
1368 void CkResetInLdb(){
1369 #if CMK_MEM_CHECKPOINT
1370         CkMemCheckPT::inLoadbalancing = 0;
1371 #endif
1372 }
1373
1374 /*****************************************************************************
1375                 module initialization
1376 *****************************************************************************/
1377
1378 static int arg_where = CkCheckPoint_inMEM;
1379
1380 #if CMK_MEM_CHECKPOINT
1381 void init_memcheckpt(char **argv)
1382 {
1383     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1384       arg_where = CkCheckPoint_inDISK;
1385     }
1386
1387         // initiliazing _crashedNode variable
1388         CpvInitialize(int, _crashedNode);
1389         CpvAccess(_crashedNode) = -1;
1390
1391 }
1392 #endif
1393
1394 class CkMemCheckPTInit: public Chare {
1395 public:
1396   CkMemCheckPTInit(CkArgMsg *m) {
1397 #if CMK_MEM_CHECKPOINT
1398     if (arg_where == CkCheckPoint_inDISK) {
1399       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1400     }
1401     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1402     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1403 #endif
1404   }
1405 };
1406
1407 static void notifyHandler(char *msg)
1408 {
1409 #if CMK_MEM_CHECKPOINT
1410   CmiFree(msg);
1411       /* immediately increase restart phase to filter old messages */
1412   CpvAccess(_curRestartPhase) ++;
1413   CpvAccess(_qd)->flushStates();
1414   _discard_charm_message();
1415 #endif
1416 }
1417
1418 extern "C"
1419 void notify_crash(int node)
1420 {
1421 #ifdef CMK_MEM_CHECKPOINT
1422   CpvAccess( _crashedNode) = node;
1423 #ifdef CMK_SMP
1424   for(int i=0;i<CkMyNodeSize();i++){
1425         CpvAccessOther(_crashedNode,i)=node;
1426   }
1427 #endif
1428   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1429   CkMemCheckPT::inRestarting = 1;
1430
1431     // this may be in interrupt handler, send a message to reset QD
1432   int pe = CmiNodeFirst(CkMyNode());
1433   for(int i=0;i<CkMyNodeSize();i++){
1434         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1435         CmiSetHandler(msg, notifyHandlerIdx);
1436         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1437   }
1438 #endif
1439 }
1440
1441 extern "C" void (*notify_crash_fn)(int node);
1442
1443 #if CMK_CONVERSE_MPI
1444 static int pingHandlerIdx;
1445 static int pingCheckHandlerIdx;
1446 static int buddyDieHandlerIdx;
1447 static double lastPingTime = -1;
1448
1449 extern "C" void mpi_restart_crashed(int pe, int rank);
1450 extern "C" int  find_spare_mpirank(int pe);
1451
1452 void pingBuddy();
1453 void pingCheckHandler();
1454
1455 void buddyDieHandler(char *msg)
1456 {
1457 #if CMK_MEM_CHECKPOINT
1458    // notify
1459    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1460    notify_crash(diepe);
1461    // send message to crash pe to let it restart
1462    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1463    int newrank = find_spare_mpirank(diepe);
1464    int buddy = obj->BuddyPE(CmiMyPe());
1465    if (buddy == diepe)  {
1466      mpi_restart_crashed(diepe, newrank);
1467      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1468    }
1469 #endif
1470 }
1471
1472 void pingHandler(void *msg)
1473 {
1474   lastPingTime = CmiWallTimer();
1475   CmiFree(msg);
1476 }
1477
1478 void pingCheckHandler()
1479 {
1480 #if CMK_MEM_CHECKPOINT
1481   double now = CmiWallTimer();
1482   if (lastPingTime > 0 && now - lastPingTime > 4 && (!CkInLdb()||buddy!=0)) {
1483     int i, pe, buddy;
1484     // tell everyone the buddy dies
1485     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1486     for (i = 1; i < CmiNumPes(); i++) {
1487        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1488        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1489     }
1490     buddy = pe;
1491     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1492     for (int pe = 0; pe < CmiNumPes(); pe++) {
1493       if (obj->isFailed(pe) || pe == buddy) continue;
1494       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1495       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1496       CmiSetHandler(msg, buddyDieHandlerIdx);
1497       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1498     }
1499   }
1500   else 
1501     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1502 #endif
1503 }
1504
1505 void pingBuddy()
1506 {
1507 #if CMK_MEM_CHECKPOINT
1508   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1509   if (obj) {
1510     int buddy = obj->BuddyPE(CkMyPe());
1511 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1512     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1513     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1514     CmiSetHandler(msg, pingHandlerIdx);
1515     CmiGetRestartPhase(msg) = 9999;
1516     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1517   }
1518   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1519 #endif
1520 }
1521 #endif
1522
1523 // initproc
1524 void CkRegisterRestartHandler( )
1525 {
1526 #if CMK_MEM_CHECKPOINT
1527   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1528   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1529   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1530   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1531   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1532
1533 #if CMK_CONVERSE_MPI
1534   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1535   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1536   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1537 #endif
1538
1539   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1540   CpvAccess(procChkptBuf) = NULL;
1541
1542   notify_crash_fn = notify_crash;
1543
1544 #if ! CMK_CONVERSE_MPI
1545   // print pid to kill
1546 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1547 //  sleep(4);
1548 #endif
1549 #endif
1550 }
1551
1552
1553 extern "C"
1554 int CkHasCheckpoints()
1555 {
1556   return checkpointed;
1557 }
1558
1559 /// @todo: the following definitions should be moved to a separate file containing
1560 // structures and functions about fault tolerance strategies
1561
1562 /**
1563  *  * @brief: function for killing a process                                             
1564  *   */
1565 #ifdef CMK_MEM_CHECKPOINT
1566 #if CMK_HAS_GETPID
1567 void killLocal(void *_dummy,double curWallTime){
1568         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1569         if(CmiWallTimer()<killTime-1){
1570                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1571         }else{ 
1572 #if CMK_CONVERSE_MPI
1573                                 CkDieNow();
1574 #else 
1575                 kill(getpid(),SIGKILL);                                               
1576 #endif
1577         }              
1578
1579 #else
1580 void killLocal(void *_dummy,double curWallTime){
1581   CmiAbort("kill() not supported!");
1582 }
1583 #endif
1584 #endif
1585
1586 #ifdef CMK_MEM_CHECKPOINT
1587 /**
1588  * @brief: reads the file with the kill information
1589  */
1590 void readKillFile(){
1591         FILE *fp=fopen(killFile,"r");
1592         if(!fp){
1593                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1594                 return;
1595         }
1596         int proc;
1597         double sec;
1598         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1599                 if(proc == CkMyNode() && CkMyRank() == 0){
1600                         killTime = CmiWallTimer()+sec;
1601                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1602                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1603                 }
1604         }
1605         fclose(fp);
1606 }
1607
1608 #if ! CMK_CONVERSE_MPI
1609 void CkDieNow()
1610 {
1611          // ignored for non-mpi version
1612         CmiPrintf("[%d] die now.\n", CmiMyPe());
1613         killTime = CmiWallTimer()+0.001;
1614         CcdCallFnAfter(killLocal,NULL,1);
1615 }
1616 #endif
1617
1618 #endif
1619
1620 #include "CkMemCheckpoint.def.h"
1621
1622