fix
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74
75 // static, so that it is accessible from Converse part
76 int CkMemCheckPT::inRestarting = 0;
77 int CkMemCheckPT::inLoadbalancing = 0;
78 double CkMemCheckPT::startTime;
79 char *CkMemCheckPT::stage;
80 CkCallback CkMemCheckPT::cpCallback;
81
82 int _memChkptOn = 1;                    // checkpoint is on or off
83
84 CkGroupID ckCheckPTGroupID;             // readonly
85
86 static int checkpointed = 0;
87
88 /// @todo the following declarations should be moved into a separate file for all 
89 // fault tolerant strategies
90
91 #ifdef CMK_MEM_CHECKPOINT
92 // name of the kill file that contains processes to be killed 
93 char *killFile;                                               
94 // flag for the kill file         
95 int killFlag=0;
96 // variable for storing the killing time
97 double killTime=0.0;
98 #endif
99
100 #ifdef CKLOCMGR_LOOP
101 #undef CKLOCMGR_LOOP
102 #endif
103 // loop over all CkLocMgr and do "code"
104 #define  CKLOCMGR_LOOP(code)    {       \
105   int numGroups = CkpvAccess(_groupIDTable)->size();    \
106   for(int i=0;i<numGroups;i++) {        \
107     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
108     if(obj->isLocMgr())  {      \
109       CkLocMgr *mgr = (CkLocMgr*)obj;   \
110       code      \
111     }   \
112   }     \
113  }
114
115 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
116 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
117
118 // compute the backup processor
119 // FIXME: avoid crashed processors
120 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
121
122 inline int CkMemCheckPT::BuddyPE(int pe)
123 {
124   int budpe;
125 #if NODE_CHECKPOINT
126     // buddy is the processor with same rank on the next physical node
127   int r1 = CmiPhysicalRank(pe);
128   int budnode = CmiPhysicalNodeID(pe);
129   do {
130     budnode = (budnode+1)%CmiNumPhysicalNodes();
131     int *pelist;
132     int num;
133     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
134     budpe = pelist[r1 % num];
135   } while (isFailed(budpe));
136   if (budpe == pe) {
137     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
138     CmiAbort("Failed to find a buddy processor");
139   }
140 #else
141   budpe = pe;
142   while (budpe == pe || isFailed(budpe)) 
143           budpe = (budpe+1)%CkNumPes();
144 #endif
145   return budpe;
146 }
147
148 // called in array element constructor
149 // choose and register with 2 buddies for checkpoiting 
150 #if CMK_MEM_CHECKPOINT
151 void ArrayElement::init_checkpt() {
152         if (_memChkptOn == 0) return;
153         if (CkInRestarting()) {
154           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
155         }
156         // only master init checkpoint
157         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
158
159         budPEs[0] = CkMyPe();
160         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
161         CmiAssert(budPEs[0] != budPEs[1]);
162         // inform checkPTMgr
163         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
164         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
165         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
166         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
167 }
168 #endif
169
170 // entry function invoked by checkpoint mgr asking for checkpoint data
171 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
172 #if CMK_MEM_CHECKPOINT
173 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
174 //char index[128];   thisIndexMax.sprint(index);
175 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
176   CkLocMgr *locMgr = thisArray->getLocMgr();
177   CmiAssert(myRec!=NULL);
178   int size;
179   {
180         PUP::sizer p;
181         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
182         size = p.size();
183   }
184   int packSize = size/sizeof(double) +1;
185   CkArrayCheckPTMessage *msg =
186                  new (packSize, 0) CkArrayCheckPTMessage;
187   msg->len = size;
188   msg->index =thisIndexMax;
189   msg->aid = thisArrayID;
190   msg->locMgr = locMgr->getGroupID();
191   msg->cp_flag = 1;
192   {
193         PUP::toMem p(msg->packData);
194         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
195   }
196
197   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
198   checkptMgr.recvData(msg, 2, budPEs);
199   delete m;
200 #endif
201 }
202
203 // checkpoint holder class - for memory checkpointing
204 class CkMemCheckPTInfo: public CkCheckPTInfo
205 {
206   CkArrayCheckPTMessage *ckBuffer;
207 public:
208   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
209                     CkCheckPTInfo(a, loc, idx, pno)
210   {
211     ckBuffer = NULL;
212   }
213   ~CkMemCheckPTInfo() 
214   {
215     if (ckBuffer) delete ckBuffer; 
216   }
217   inline void updateBuffer(CkArrayCheckPTMessage *data) 
218   {
219     CmiAssert(data!=NULL);
220     if (ckBuffer) delete ckBuffer;
221     ckBuffer = data;
222   }    
223   inline CkArrayCheckPTMessage * getCopy()
224   {
225     if (ckBuffer == NULL) {
226       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
227       CmiAbort("Abort!");
228     }
229     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
230   }     
231   inline void updateBuddy(int b1, int b2) {
232      CmiAssert(ckBuffer);
233      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
234      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
235      CmiAssert(pNo != CkMyPe());
236   }
237   inline int getSize() { 
238      CmiAssert(ckBuffer);
239      return ckBuffer->len; 
240   }
241 };
242
243 // checkpoint holder class - for in-disk checkpointing
244 class CkDiskCheckPTInfo: public CkCheckPTInfo 
245 {
246   char *fname;
247   int bud1, bud2;
248   int len;                      // checkpoint size
249 public:
250   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
251   {
252 #if CMK_USE_MKSTEMP
253     fname = new char[64];
254     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
255     mkstemp(fname);
256 #else
257     fname=tmpnam(NULL);
258 #endif
259     bud1 = bud2 = -1;
260     len = 0;
261   }
262   ~CkDiskCheckPTInfo() 
263   {
264     remove(fname);
265   }
266   inline void updateBuffer(CkArrayCheckPTMessage *data) 
267   {
268     double t = CmiWallTimer();
269     // unpack it
270     envelope *env = UsrToEnv(data);
271     CkUnpackMessage(&env);
272     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
273     FILE *f = fopen(fname,"wb");
274     PUP::toDisk p(f);
275     CkPupMessage(p, (void **)&data);
276     // delay sync to the end because otherwise the messages are blocked
277 //    fsync(fileno(f));
278     fclose(f);
279     bud1 = data->bud1;
280     bud2 = data->bud2;
281     len = data->len;
282     delete data;
283     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
284   }
285   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
286   {
287     CkArrayCheckPTMessage *data;
288     FILE *f = fopen(fname,"rb");
289     PUP::fromDisk p(f);
290     CkPupMessage(p, (void **)&data);
291     fclose(f);
292     data->bud1 = bud1;                          // update the buddies
293     data->bud2 = bud2;
294     return data;
295   }
296   inline void updateBuddy(int b1, int b2) {
297      bud1 = b1; bud2 = b2;
298      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
299      CmiAssert(pNo != CkMyPe());
300   }
301   inline int getSize() { 
302      return len; 
303   }
304 };
305
306 CkMemCheckPT::CkMemCheckPT(int w)
307 {
308   int numnodes = 0;
309 #if NODE_CHECKPOINT
310   numnodes = CmiNumPhysicalNodes();
311 #else
312   numnodes = CkNumPes();
313 #endif
314 #if CK_NO_PROC_POOL
315   if (numnodes <= 2)
316 #else
317   if (numnodes  == 1)
318 #endif
319   {
320     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
321     _memChkptOn = 0;
322   }
323   inRestarting = 0;
324   recvCount = peCount = 0;
325   ackCount = 0;
326   expectCount = -1;
327   where = w;
328
329 #if CMK_CONVERSE_MPI
330   void pingBuddy();
331   void pingCheckHandler();
332   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
333   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
334 #endif
335 }
336
337 CkMemCheckPT::~CkMemCheckPT()
338 {
339   int len = ckTable.length();
340   for (int i=0; i<len; i++) {
341     delete ckTable[i];
342   }
343 }
344
345 void CkMemCheckPT::pup(PUP::er& p) 
346
347   CBase_CkMemCheckPT::pup(p); 
348   p|cpStarter;
349   p|thisFailedPe;
350   p|failedPes;
351   p|ckCheckPTGroupID;           // recover global variable
352   p|cpCallback;                 // store callback
353   p|where;                      // where to checkpoint
354   p|peCount;
355   if (p.isUnpacking()) {
356     recvCount = 0;
357 #if CMK_CONVERSE_MPI
358     void pingBuddy();
359     void pingCheckHandler();
360     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
361     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
362 #endif
363   }
364 }
365
366 // called by checkpoint mgr to restore an array element
367 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
368 {
369 #if CMK_MEM_CHECKPOINT
370   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
371   // m->index.print();
372   PUP::fromMem p(m->packData);
373   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
374   CmiAssert(mgr);
375 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
376   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
377 #else
378   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
379 #endif
380
381   // find a list of array elements bound together
382   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
383   CmiAssert(elt);
384   CkLocRec_local *rec = elt->myRec;
385   CkVec<CkMigratable *> list;
386   mgr->migratableList(rec, list);
387   CmiAssert(list.length() > 0);
388   for (int l=0; l<list.length(); l++) {
389     elt = (ArrayElement *)list[l];
390     elt->budPEs[0] = m->bud1;
391     elt->budPEs[1] = m->bud2;
392     //    reset, may not needed now
393     // for now.
394     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
395       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
396       if (c) c->redNo = 0;
397     }
398   }
399 #endif
400 }
401
402 // return 1 if pe is a crashed processor
403 int CkMemCheckPT::isFailed(int pe)
404 {
405   for (int i=0; i<failedPes.length(); i++)
406     if (failedPes[i] == pe) return 1;
407   return 0;
408 }
409
410 // add pe into history list of all failed processors
411 void CkMemCheckPT::failed(int pe)
412 {
413   if (isFailed(pe)) return;
414   failedPes.push_back(pe);
415 }
416
417 int CkMemCheckPT::totalFailed()
418 {
419   return failedPes.length();
420 }
421
422 // create an checkpoint entry for array element of aid with index.
423 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
424 {
425   // error check, no duplicate
426   int idx, len = ckTable.size();
427   for (idx=0; idx<len; idx++) {
428     CkCheckPTInfo *entry = ckTable[idx];
429     if (index == entry->index) {
430       if (loc == entry->locMgr) {
431           // bindTo array elements
432           return;
433       }
434         // for array inheritance, the following check may fail
435         // because ArrayElement constructor of all superclasses are called
436       if (aid == entry->aid) {
437         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
438         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
439       }
440     }
441   }
442   CkCheckPTInfo *newEntry;
443   if (where == CkCheckPoint_inMEM)
444     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
445   else
446     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
447   ckTable.push_back(newEntry);
448   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
449 }
450
451 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
452 {
453 #if !CMK_CHKP_ALL       
454   int buddy = msg->bud1;
455   if (buddy == CkMyPe()) buddy = msg->bud2;
456   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
457   recvData(msg);
458     // ack
459   thisProxy[buddy].gotData();
460 #else
461   recvArrayCheckpoint(msg);
462   thisProxy[msg->bud2].gotData();
463 #endif
464 }
465
466 // loop through my checkpoint table and ask checkpointed array elements
467 // to send me checkpoint data.
468 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
469 {
470   checkpointed = 1;
471   cpCallback = cb;
472   cpStarter = starter;
473   if (CkMyPe() == cpStarter) {
474     startTime = CmiWallTimer();
475     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
476   }
477 #if !CMK_CHKP_ALL
478   int len = ckTable.length();
479   for (int i=0; i<len; i++) {
480     CkCheckPTInfo *entry = ckTable[i];
481       // always let the bigger number processor send request
482     //if (CkMyPe() < entry->pNo) continue;
483       // always let the smaller number processor send request, may on same proc
484     if (!isMaster(entry->pNo)) continue;
485       // call inmem_checkpoint to the array element, ask it to send
486       // back checkpoint data via recvData().
487     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
488     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
489   }
490     // if my table is empty, then I am done
491   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
492 #else
493   startArrayCheckpoint();
494 #endif
495   // pack and send proc level data
496   sendProcData();
497 }
498
499 class MemElementPacker : public CkLocIterator{
500         private:
501                 CkLocMgr *locMgr;
502                 PUP::er &p;
503         public:
504                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
505                 void addLocation(CkLocation &loc){
506                         CkArrayIndexMax idx = loc.getIndex();
507                         CkGroupID gID = locMgr->ckGetGroupID();
508                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
509                         CmiAssert(elt);
510                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
511                         p|gID;
512                         p|idx;
513                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
514                 }
515 };
516
517 void CkMemCheckPT::pupAllElements(PUP::er &p){
518 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
519         int numElements;
520         if(!p.isUnpacking()){
521                 numElements = CkCountArrayElements();
522         }
523         p | numElements;
524         if(!p.isUnpacking()){
525                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
526         }
527 #endif
528 }
529
530 void CkMemCheckPT::startArrayCheckpoint(){
531 #if CMK_CHKP_ALL
532         int size;
533         {
534                 PUP::sizer psizer;
535                 pupAllElements(psizer);
536                 size = psizer.size();
537         }
538         int packSize = size/sizeof(double)+1;
539  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
540         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
541         msg->len = size;
542         msg->cp_flag = 1;
543         int budPEs[2];
544         msg->bud1=CkMyPe();
545         msg->bud2=ChkptOnPe(CkMyPe());
546         {
547                 PUP::toMem p(msg->packData);
548                 pupAllElements(p);
549         }
550         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
551         chkpTable[0] = msg;
552         recvCount++;
553 #endif
554 }
555
556 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
557 {
558 #if CMK_CHKP_ALL
559         int idx = 1;
560         if(msg->bud1 == CkMyPe()){
561                 idx = 0;
562         }
563         int isChkpting = msg->cp_flag;
564         chkpTable[idx] = msg;
565         if(isChkpting){
566                 recvCount++;
567                 if(recvCount == 2){
568                   if (where == CkCheckPoint_inMEM) {
569                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
570                   }
571                   else if (where == CkCheckPoint_inDISK) {
572                         // another barrier for finalize the writing using fsync
573                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
574                         contribute(0,NULL,CkReduction::sum_int,localcb);
575                   }
576                   else
577                         CmiAbort("Unknown checkpoint scheme");
578                   recvCount = 0;
579                 }
580         }
581 #endif
582 }
583
584 // don't handle array elements
585 static inline void _handleProcData(PUP::er &p)
586 {
587     // save readonlys, and callback BTW
588     CkPupROData(p);
589
590     // save mainchares 
591     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
592         
593 #ifndef CMK_CHARE_USE_PTR
594     // save non-migratable chare
595     CkPupChareData(p);
596 #endif
597
598     // save groups into Groups.dat
599     CkPupGroupData(p);
600
601     // save nodegroups into NodeGroups.dat
602     if(CkMyRank()==0) CkPupNodeGroupData(p);
603 }
604
605 void CkMemCheckPT::sendProcData()
606 {
607   // find out size of buffer
608   int size;
609   {
610     PUP::sizer p;
611     _handleProcData(p);
612     size = p.size();
613   }
614   int packSize = size;
615   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
616   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
617   {
618     PUP::toMem p(msg->packData);
619     _handleProcData(p);
620   }
621   msg->pe = CkMyPe();
622   msg->len = size;
623   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
624   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
625 }
626
627 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
628 {
629   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
630   CpvAccess(procChkptBuf) = msg;
631   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
632   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
633 }
634
635 // ArrayElement call this function to give us the checkpointed data
636 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
637 {
638   int len = ckTable.length();
639   int idx;
640   for (idx=0; idx<len; idx++) {
641     CkCheckPTInfo *entry = ckTable[idx];
642     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
643   }
644   CkAssert(idx < len);
645   int isChkpting = msg->cp_flag;
646   ckTable[idx]->updateBuffer(msg);
647   if (isChkpting) {
648       // all my array elements have returned their inmem data
649       // inform starter processor that I am done.
650     recvCount ++;
651     if (recvCount == ckTable.length()) {
652       if (where == CkCheckPoint_inMEM) {
653         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
654       }
655       else if (where == CkCheckPoint_inDISK) {
656         // another barrier for finalize the writing using fsync
657         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
658         contribute(0,NULL,CkReduction::sum_int,localcb);
659       }
660       else
661         CmiAbort("Unknown checkpoint scheme");
662       recvCount = 0;
663     } 
664   }
665 }
666
667 // only used in disk checkpointing
668 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
669 {
670   delete m;
671 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
672   system("sync");
673 #endif
674   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
675 }
676
677 // only is called on cpStarter when checkpoint is done
678 void CkMemCheckPT::cpFinish()
679 {
680   CmiAssert(CkMyPe() == cpStarter);
681   peCount++;
682     // now that all processors have finished, activate callback
683   if (peCount == 2) 
684 {
685     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
686     cpCallback.send();
687     peCount = 0;
688     thisProxy.report();
689   }
690 }
691
692 // for debugging, report checkpoint info
693 void CkMemCheckPT::report()
694 {
695 #if !CMK_CHKP_ALL
696   int objsize = 0;
697   int len = ckTable.length();
698   for (int i=0; i<len; i++) {
699     CkCheckPTInfo *entry = ckTable[i];
700     CmiAssert(entry);
701     objsize += entry->getSize();
702   }
703   CmiAssert(CpvAccess(procChkptBuf));
704   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
705 #else
706   if(CkMyPe()==0)
707   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
708 #endif
709 }
710
711 /*****************************************************************************
712                         RESTART Procedure
713 *****************************************************************************/
714
715 // master processor of two buddies
716 inline int CkMemCheckPT::isMaster(int buddype)
717 {
718 #if 0
719   int mype = CkMyPe();
720 //CkPrintf("ismaster: %d %d\n", pe, mype);
721   if (CkNumPes() - totalFailed() == 2) {
722     return mype > buddype;
723   }
724   for (int i=1; i<CkNumPes(); i++) {
725     int me = (buddype+i)%CkNumPes();
726     if (isFailed(me)) continue;
727     if (me == mype) return 1;
728     else return 0;
729   }
730   return 0;
731 #else
732     // smaller one
733   int mype = CkMyPe();
734 //CkPrintf("ismaster: %d %d\n", pe, mype);
735   if (CkNumPes() - totalFailed() == 2) {
736     return mype < buddype;
737   }
738 #if NODE_CHECKPOINT
739   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
740   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
741 #else
742   for (int i=1; i<CkNumPes(); i++) {
743 #endif
744     int me = (mype+i)%CkNumPes();
745     if (isFailed(me)) continue;
746     if (me == buddype) return 1;
747     else return 0;
748   }
749   return 0;
750 #endif
751 }
752
753
754
755 #if 0
756 // helper class to pup all elements that belong to same ckLocMgr
757 class ElementDestoryer : public CkLocIterator {
758 private:
759         CkLocMgr *locMgr;
760 public:
761         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
762         void addLocation(CkLocation &loc) {
763                 CkArrayIndex idx=loc.getIndex();
764                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
765                 loc.destroy();
766         }
767 };
768 #endif
769
770 // restore the bitmap vector for LB
771 void CkMemCheckPT::resetLB(int diepe)
772 {
773 #if CMK_LBDB_ON
774   int i;
775   char *bitmap = new char[CkNumPes()];
776   // set processor available bitmap
777   get_avail_vector(bitmap);
778
779   for (i=0; i<failedPes.length(); i++)
780     bitmap[failedPes[i]] = 0; 
781   bitmap[diepe] = 0;
782
783 #if CK_NO_PROC_POOL
784   set_avail_vector(bitmap);
785 #endif
786
787   // if I am the crashed pe, rebuild my failedPEs array
788   if (CkMyNode() == diepe)
789     for (i=0; i<CkNumPes(); i++) 
790       if (bitmap[i]==0) failed(i);
791
792   delete [] bitmap;
793 #endif
794 }
795
796 // in case when failedPe dies, everybody go through its checkpoint table:
797 // destory all array elements
798 // recover lost buddies
799 // reconstruct all array elements from check point data
800 // called on all processors
801 void CkMemCheckPT::restart(int diePe)
802 {
803 #if CMK_MEM_CHECKPOINT
804   double curTime = CmiWallTimer();
805   if (CkMyPe() == diePe)
806     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
807   stage = (char*)"resetLB";
808   startTime = curTime;
809   if (CkMyPe() == diePe)
810     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
811
812 #if CK_NO_PROC_POOL
813   failed(diePe);        // add into the list of failed pes
814 #endif
815   thisFailedPe = diePe;
816
817   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
818
819   inRestarting = 1;
820                                                                                 
821   // disable load balancer's barrier
822   if (CkMyPe() != diePe) resetLB(diePe);
823
824   CKLOCMGR_LOOP(mgr->startInserting(););
825
826   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
827   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
828 /*
829   if (CkMyPe() == 0)
830     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
831 */
832 #endif
833 }
834
835 // loally remove all array elements
836 void CkMemCheckPT::removeArrayElements()
837 {
838 #if CMK_MEM_CHECKPOINT
839   int len = ckTable.length();
840   double curTime = CmiWallTimer();
841   if (CkMyPe() == thisFailedPe) 
842     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
843   stage = (char*)"removeArrayElements";
844   startTime = curTime;
845
846   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
847   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
848
849   // get rid of all buffering and remote recs
850   // including destorying all array elements
851 #if CK_NO_PROC_POOL  
852         CKLOCMGR_LOOP(mgr->flushAllRecs(););
853 #else
854         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
855 #endif
856 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
857
858   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
859   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
860 #endif
861 }
862
863 // flush state in reduction manager
864 void CkMemCheckPT::resetReductionMgr()
865 {
866   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
867   int numGroups = CkpvAccess(_groupIDTable)->size();
868   for(int i=0;i<numGroups;i++) {
869     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
870     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
871     obj->flushStates();
872     obj->ckJustMigrated();
873   }
874   // reset again
875   //CpvAccess(_qd)->flushStates();
876
877 #if 1
878   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
879   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
880 #else
881   if (CkMyPe() == 0)
882     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
883 #endif
884 }
885
886 // recover the lost buddies
887 void CkMemCheckPT::recoverBuddies()
888 {
889   int idx;
890   int len = ckTable.length();
891   // ready to flush reduction manager
892   // cannot be CkMemCheckPT::restart because destory will modify states
893   double curTime = CmiWallTimer();
894   if (CkMyPe() == thisFailedPe)
895   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
896   stage = (char *)"recoverBuddies";
897   if (CkMyPe() == thisFailedPe)
898   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
899   startTime = curTime;
900
901   // recover buddies
902   expectCount = 0;
903 #if !CMK_CHKP_ALL
904   for (idx=0; idx<len; idx++) {
905     CkCheckPTInfo *entry = ckTable[idx];
906     if (entry->pNo == thisFailedPe) {
907 #if CK_NO_PROC_POOL
908       // find a new buddy
909       int budPe = BuddyPE(CkMyPe());
910 #else
911       int budPe = thisFailedPe;
912 #endif
913       CkArrayCheckPTMessage *msg = entry->getCopy();
914       msg->bud1 = budPe;
915       msg->bud2 = CkMyPe();
916       msg->cp_flag = 0;            // not checkpointing
917       thisProxy[budPe].recoverEntry(msg);
918       expectCount ++;
919     }
920   }
921 #else
922   //send to failed pe
923   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
924 #if CK_NO_PROC_POOL
925       // find a new buddy
926       int budPe = BuddyPE(CkMyPe());
927 #else
928       int budPe = thisFailedPe;
929 #endif
930       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
931       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
932           msg->cp_flag = 0;            // not checkpointing
933       msg->bud1 = budPe;
934       msg->bud2 = CkMyPe();
935       thisProxy[budPe].recoverEntry(msg);
936       expectCount ++;
937   }
938 #endif
939
940 #if 1
941   if (expectCount == 0) {
942     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
943     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
944   }
945 #else
946   if (CkMyPe() == 0) {
947     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
948   }
949 #endif
950
951   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
952 }
953
954 void CkMemCheckPT::gotData()
955 {
956   ackCount ++;
957   if (ackCount == expectCount) {
958     ackCount = 0;
959     expectCount = -1;
960     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
961     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
962   }
963 }
964
965 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
966 {
967
968   for (int i=0; i<n; i++) {
969     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
970     mgr->updateLocation(idx[i], nowOnPe);
971   }
972         thisProxy[nowOnPe].gotReply();
973 }
974
975 // restore array elements
976 void CkMemCheckPT::recoverArrayElements()
977 {
978   double curTime = CmiWallTimer();
979   int len = ckTable.length();
980   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
981   stage = (char *)"recoverArrayElements";
982   if (CkMyPe() == thisFailedPe)
983   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
984   startTime = curTime;
985  int flag = 0;
986   // recover all array elements
987   int count = 0;
988
989 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
990   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
991   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
992 #endif
993
994 #if !CMK_CHKP_ALL
995   for (int idx=0; idx<len; idx++)
996   {
997     CkCheckPTInfo *entry = ckTable[idx];
998 #if CK_NO_PROC_POOL
999     // the bigger one will do 
1000 //    if (CkMyPe() < entry->pNo) continue;
1001     if (!isMaster(entry->pNo)) continue;
1002 #else
1003     // smaller one do it, which has the original object
1004     if (CkMyPe() == entry->pNo+1 || 
1005         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1006 #endif
1007 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1008
1009     entry->updateBuddy(CkMyPe(), entry->pNo);
1010     CkArrayCheckPTMessage *msg = entry->getCopy();
1011     // gzheng
1012     //thisProxy[CkMyPe()].inmem_restore(msg);
1013     inmem_restore(msg);
1014 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1015     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1016     int homePe = mgr->homePe(msg->index);
1017     if (homePe != CkMyPe()) {
1018       gmap[homePe].push_back(msg->locMgr);
1019       imap[homePe].push_back(msg->index);
1020     }
1021 #endif
1022     CkFreeMsg(msg);
1023     count ++;
1024   }
1025 #else
1026         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1027 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1028         recoverAll(msg,gmap,imap);
1029 #else
1030         recoverAll(msg);
1031 #endif
1032     CkFreeMsg(msg);
1033 #endif
1034   curTime = CmiWallTimer();
1035   if (CkMyPe() == thisFailedPe)
1036         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1037 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1038   for (int i=0; i<CkNumPes(); i++) {
1039     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1040       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1041         flag++; 
1042           }
1043   }
1044   delete [] imap;
1045   delete [] gmap;
1046 #endif
1047   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1048
1049   CKLOCMGR_LOOP(mgr->doneInserting(););
1050
1051   // _crashedNode = -1;
1052   CpvAccess(_crashedNode) = -1;
1053   inRestarting = 0;
1054 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1055   if (CkMyPe() == 0)
1056     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1057 #else
1058 if(flag == 0)
1059 {
1060     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1061 }
1062 #endif
1063 }
1064
1065 void CkMemCheckPT::gotReply(){
1066     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1067 }
1068
1069 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1070 #if CMK_CHKP_ALL
1071         PUP::fromMem p(msg->packData);
1072         int numElements;
1073         p|numElements;
1074         if(p.isUnpacking()){
1075                 for(int i=0;i<numElements;i++){
1076                         CkGroupID gID;
1077                         CkArrayIndex idx;
1078                         p|gID;
1079                         p|idx;
1080                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1081                         int homePe = mgr->homePe(idx);
1082 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1083                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1084 #else
1085                         mgr->resume(idx,p,CmiFalse,CmiTrue);
1086 #endif
1087 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1088                         homePe = mgr->homePe(idx);
1089                         if (homePe != CkMyPe()) {
1090                           gmap[homePe].push_back(gID);
1091                           imap[homePe].push_back(idx);
1092                         }
1093 #endif
1094                 }
1095         }
1096         if(CkMyPe()==thisFailedPe)
1097         CkPrintf("recover all ends\n"); 
1098 #endif
1099 }
1100
1101 static double restartT;
1102
1103 // on every processor
1104 // turn load balancer back on
1105 void CkMemCheckPT::finishUp()
1106 {
1107   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1108   //CKLOCMGR_LOOP(mgr->doneInserting(););
1109   
1110   if (CkMyPe() == thisFailedPe)
1111   {
1112        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1113        //CkStartQD(cpCallback);
1114        cpCallback.send();
1115        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1116   }
1117
1118 #if CK_NO_PROC_POOL
1119 #if NODE_CHECKPOINT
1120   int numnodes = CmiNumPhysicalNodes();
1121 #else
1122   int numnodes = CkNumPes();
1123 #endif
1124   if (numnodes-totalFailed() <=2) {
1125     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1126     _memChkptOn = 0;
1127   }
1128 #endif
1129 }
1130
1131 // called only on 0
1132 void CkMemCheckPT::quiescence(CkCallback &cb)
1133 {
1134   static int pe_count = 0;
1135   pe_count ++;
1136   CmiAssert(CkMyPe() == 0);
1137   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1138   if (pe_count == CkNumPes()) {
1139     pe_count = 0;
1140     cb.send();
1141   }
1142 }
1143
1144 // User callable function - to start a checkpoint
1145 // callback cb is used to pass control back
1146 void CkStartMemCheckpoint(CkCallback &cb)
1147 {
1148 #if CMK_MEM_CHECKPOINT
1149   if (_memChkptOn == 0) {
1150     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1151     cb.send();
1152     return;
1153   }
1154   if (CkInRestarting()) {
1155       // trying to checkpointing during restart
1156     cb.send();
1157     return;
1158   }
1159     // store user callback and user data
1160   CkMemCheckPT::cpCallback = cb;
1161
1162     // broadcast to start check pointing
1163   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1164   checkptMgr.doItNow(CkMyPe(), cb);
1165 #else
1166   // when mem checkpoint is disabled, invike cb immediately
1167   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1168   cb.send();
1169 #endif
1170 }
1171
1172 void CkRestartCheckPoint(int diePe)
1173 {
1174 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1175   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1176   // broadcast
1177   checkptMgr.restart(diePe);
1178 }
1179
1180 static int _diePE = -1;
1181
1182 // callback function used locally by ccs handler
1183 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1184 {
1185 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1186   CkRestartCheckPoint(_diePE);
1187 }
1188
1189 // Converse function handles
1190 static int askPhaseHandlerIdx;
1191 static int recvPhaseHandlerIdx;
1192 static int askProcDataHandlerIdx;
1193 static int restartBcastHandlerIdx;
1194 static int recoverProcDataHandlerIdx;
1195 static int restartBeginHandlerIdx;
1196 static int notifyHandlerIdx;
1197
1198 // called on crashed PE
1199 static void restartBeginHandler(char *msg)
1200 {
1201   CmiFree(msg);
1202 #if CMK_MEM_CHECKPOINT
1203 #if CMK_USE_BARRIER
1204         if(CkMyPe()!=_diePE){
1205                 printf("restar begin on %d\n",CkMyPe());
1206                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1207                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1208                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1209         }else{
1210         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1211         CkRestartCheckPointCallback(NULL, NULL);
1212         }
1213 #else
1214   static int count = 0;
1215   CmiAssert(CkMyPe() == _diePE);
1216   count ++;
1217   if (count == CkNumPes()) {
1218     CkRestartCheckPointCallback(NULL, NULL);
1219     count = 0;
1220   }
1221 #endif
1222 #endif
1223 }
1224
1225 extern void _discard_charm_message();
1226 extern void _resume_charm_message();
1227
1228 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1229         return data;
1230 }
1231
1232 static void restartBcastHandler(char *msg)
1233 {
1234 #if CMK_MEM_CHECKPOINT
1235   // advance phase counter
1236   CkMemCheckPT::inRestarting = 1;
1237   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1238   // gzheng
1239   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1240
1241   if (CkMyPe()==_diePE)
1242     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1243
1244   // reset QD counters
1245 /*  gzheng
1246   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1247 */
1248
1249 /*  gzheng
1250   if (CkMyPe()==_diePE)
1251       CkRestartCheckPointCallback(NULL, NULL);
1252 */
1253   CmiFree(msg);
1254
1255   _resume_charm_message();
1256
1257     // reduction
1258   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1259   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1260 #if CMK_USE_BARRIER
1261         //CmiPrintf("before reduce\n"); 
1262         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1263         //CmiPrintf("after reduce\n");  
1264 #else
1265   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1266 #endif 
1267  checkpointed = 0;
1268 #endif
1269 }
1270
1271 extern void _initDone();
1272
1273 // called on crashed processor
1274 static void recoverProcDataHandler(char *msg)
1275 {
1276 #if CMK_MEM_CHECKPOINT
1277    int i;
1278    envelope *env = (envelope *)msg;
1279    CkUnpackMessage(&env);
1280    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1281    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1282    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1283    //cur_restart_phase ++;
1284      // gzheng ?
1285    //CpvAccess(_qd)->flushStates();
1286
1287    // restore readonly, mainchare, group, nodegroup
1288 //   int temp = cur_restart_phase;
1289 //   cur_restart_phase = -1;
1290    PUP::fromMem p(procMsg->packData);
1291    _handleProcData(p);
1292
1293    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1294    // gzheng
1295    CKLOCMGR_LOOP(mgr->startInserting(););
1296
1297    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1298    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1299    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1300    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1301
1302    _initDone();
1303 //   CpvAccess(_qd)->flushStates();
1304    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1305 #endif
1306 }
1307
1308 // called on its backup processor
1309 // get backup message buffer and sent to crashed processor
1310 static void askProcDataHandler(char *msg)
1311 {
1312 #if CMK_MEM_CHECKPOINT
1313     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1314     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1315     if (CpvAccess(procChkptBuf) == NULL)  {
1316       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1317       CkAbort("no checkpoint found");
1318     }
1319     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1320     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1321
1322     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1323
1324     CkPackMessage(&env);
1325     CmiSetHandler(env, recoverProcDataHandlerIdx);
1326     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1327     CpvAccess(procChkptBuf) = NULL;
1328     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1329 #endif
1330 }
1331
1332 // called on PE 0
1333 void qd_callback(void *m)
1334 {
1335    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1336    CkFreeMsg(m);
1337 #ifdef CMK_SMP
1338    for(int i=0;i<CmiMyNodeSize();i++){
1339    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1340    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1341         CmiSetHandler(msg, askProcDataHandlerIdx);
1342         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1343         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1344    }
1345    return;
1346 #endif
1347    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1348    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1349    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1350    CmiSetHandler(msg, askProcDataHandlerIdx);
1351    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1352    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1353
1354 }
1355
1356 // on crashed node
1357 void CkMemRestart(const char *dummy, CkArgMsg *args)
1358 {
1359 #if CMK_MEM_CHECKPOINT
1360    _diePE = CmiMyNode();
1361    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1362    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1363    CkMemCheckPT::inRestarting = 1;
1364
1365   CpvAccess( _crashedNode )= CmiMyNode();
1366         
1367   _discard_charm_message();
1368     restartT = CmiWallTimer();
1369    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1370   
1371   /*if(CmiMyRank()==0){
1372     CkCallback cb(qd_callback);
1373     CkStartQD(cb);
1374     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1375   }*/
1376    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1377    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1378    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1379    CmiSetHandler(msg, askProcDataHandlerIdx);
1380    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1381    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1382 #else
1383    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1384 #endif
1385 }
1386
1387 // can be called in other files
1388 // return true if it is in restarting
1389 extern "C"
1390 int CkInRestarting()
1391 {
1392 #if CMK_MEM_CHECKPOINT
1393   if (CpvAccess( _crashedNode)!=-1) return 1;
1394   // gzheng
1395   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1396   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1397   return CkMemCheckPT::inRestarting;
1398 #else
1399   return 0;
1400 #endif
1401 }
1402
1403 extern "C"
1404 void CkSetInLdb(){
1405 #if CMK_MEM_CHECKPOINT
1406         CkMemCheckPT::inLoadbalancing = 1;
1407 #endif
1408 }
1409
1410 extern "C"
1411 int CkInLdb(){
1412 #if CMK_MEM_CHECKPOINT
1413         return CkMemCheckPT::inLoadbalancing;
1414 #endif
1415         return 0;
1416 }
1417
1418 extern "C"
1419 void CkResetInLdb(){
1420 #if CMK_MEM_CHECKPOINT
1421         CkMemCheckPT::inLoadbalancing = 0;
1422 #endif
1423 }
1424
1425 /*****************************************************************************
1426                 module initialization
1427 *****************************************************************************/
1428
1429 static int arg_where = CkCheckPoint_inMEM;
1430
1431 #if CMK_MEM_CHECKPOINT
1432 void init_memcheckpt(char **argv)
1433 {
1434     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1435       arg_where = CkCheckPoint_inDISK;
1436     }
1437
1438         // initiliazing _crashedNode variable
1439         CpvInitialize(int, _crashedNode);
1440         CpvAccess(_crashedNode) = -1;
1441
1442 }
1443 #endif
1444
1445 class CkMemCheckPTInit: public Chare {
1446 public:
1447   CkMemCheckPTInit(CkArgMsg *m) {
1448 #if CMK_MEM_CHECKPOINT
1449     if (arg_where == CkCheckPoint_inDISK) {
1450       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1451     }
1452     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1453     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1454 #endif
1455   }
1456 };
1457
1458 static void notifyHandler(char *msg)
1459 {
1460 #if CMK_MEM_CHECKPOINT
1461   CmiFree(msg);
1462       /* immediately increase restart phase to filter old messages */
1463   CpvAccess(_curRestartPhase) ++;
1464   CpvAccess(_qd)->flushStates();
1465   _discard_charm_message();
1466
1467 #endif
1468 }
1469
1470 extern "C"
1471 void notify_crash(int node)
1472 {
1473 #ifdef CMK_MEM_CHECKPOINT
1474   CpvAccess( _crashedNode) = node;
1475 #ifdef CMK_SMP
1476   for(int i=0;i<CkMyNodeSize();i++){
1477         CpvAccessOther(_crashedNode,i)=node;
1478   }
1479 #endif
1480   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1481   CkMemCheckPT::inRestarting = 1;
1482
1483     // this may be in interrupt handler, send a message to reset QD
1484   int pe = CmiNodeFirst(CkMyNode());
1485   for(int i=0;i<CkMyNodeSize();i++){
1486         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1487         CmiSetHandler(msg, notifyHandlerIdx);
1488         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1489   }
1490 #endif
1491 }
1492
1493 extern "C" void (*notify_crash_fn)(int node);
1494
1495 #if CMK_CONVERSE_MPI
1496 static int pingHandlerIdx;
1497 static int pingCheckHandlerIdx;
1498 static int buddyDieHandlerIdx;
1499 static double lastPingTime = -1;
1500
1501 extern "C" void mpi_restart_crashed(int pe, int rank);
1502 extern "C" int  find_spare_mpirank(int pe);
1503
1504 void pingBuddy();
1505 void pingCheckHandler();
1506
1507 void buddyDieHandler(char *msg)
1508 {
1509 #if CMK_MEM_CHECKPOINT
1510    // notify
1511    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1512    notify_crash(diepe);
1513    // send message to crash pe to let it restart
1514    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1515    int newrank = find_spare_mpirank(diepe);
1516    int buddy = obj->BuddyPE(CmiMyPe());
1517    if (buddy == diepe)  {
1518      mpi_restart_crashed(diepe, newrank);
1519      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1520    }
1521 #endif
1522 }
1523
1524 void pingHandler(void *msg)
1525 {
1526   lastPingTime = CmiWallTimer();
1527   CmiFree(msg);
1528 }
1529
1530 void pingCheckHandler()
1531 {
1532 #if CMK_MEM_CHECKPOINT
1533   double now = CmiWallTimer();
1534   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1535     int i, pe, buddy;
1536     // tell everyone the buddy dies
1537     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1538     for (i = 1; i < CmiNumPes(); i++) {
1539        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1540        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1541     }
1542     buddy = pe;
1543     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1544     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1545       if (obj->isFailed(pe) || pe == buddy) continue;
1546       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1547       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1548       CmiSetHandler(msg, buddyDieHandlerIdx);
1549       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1550     }*/
1551     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1552     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1553     CmiSetHandler(msg, buddyDieHandlerIdx);
1554     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1555   }
1556   else 
1557     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1558 #endif
1559 }
1560
1561 void pingBuddy()
1562 {
1563 #if CMK_MEM_CHECKPOINT
1564   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1565   if (obj) {
1566     int buddy = obj->BuddyPE(CkMyPe());
1567 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1568     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1569     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1570     CmiSetHandler(msg, pingHandlerIdx);
1571     CmiGetRestartPhase(msg) = 9999;
1572     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1573   }
1574   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1575 #endif
1576 }
1577 #endif
1578
1579 // initproc
1580 void CkRegisterRestartHandler( )
1581 {
1582 #if CMK_MEM_CHECKPOINT
1583   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1584   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1585   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1586   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1587   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1588
1589 #if CMK_CONVERSE_MPI
1590   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1591   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1592   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1593 #endif
1594
1595   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1596   CpvAccess(procChkptBuf) = NULL;
1597
1598   notify_crash_fn = notify_crash;
1599
1600 #if ! CMK_CONVERSE_MPI
1601   // print pid to kill
1602 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1603 //  sleep(4);
1604 #endif
1605 #endif
1606 }
1607
1608
1609 extern "C"
1610 int CkHasCheckpoints()
1611 {
1612   return checkpointed;
1613 }
1614
1615 /// @todo: the following definitions should be moved to a separate file containing
1616 // structures and functions about fault tolerance strategies
1617
1618 /**
1619  *  * @brief: function for killing a process                                             
1620  *   */
1621 #ifdef CMK_MEM_CHECKPOINT
1622 #if CMK_HAS_GETPID
1623 void killLocal(void *_dummy,double curWallTime){
1624         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1625         if(CmiWallTimer()<killTime-1){
1626                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1627         }else{ 
1628 #if CMK_CONVERSE_MPI
1629                                 CkDieNow();
1630 #else 
1631                 kill(getpid(),SIGKILL);                                               
1632 #endif
1633         }              
1634
1635 #else
1636 void killLocal(void *_dummy,double curWallTime){
1637   CmiAbort("kill() not supported!");
1638 }
1639 #endif
1640 #endif
1641
1642 #ifdef CMK_MEM_CHECKPOINT
1643 /**
1644  * @brief: reads the file with the kill information
1645  */
1646 void readKillFile(){
1647         FILE *fp=fopen(killFile,"r");
1648         if(!fp){
1649                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1650                 return;
1651         }
1652         int proc;
1653         double sec;
1654         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1655                 if(proc == CkMyNode() && CkMyRank() == 0){
1656                         killTime = CmiWallTimer()+sec;
1657                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1658                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1659                 }
1660         }
1661         fclose(fp);
1662 }
1663
1664 #if ! CMK_CONVERSE_MPI
1665 void CkDieNow()
1666 {
1667 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1668          // ignored for non-mpi version
1669         CmiPrintf("[%d] die now.\n", CmiMyPe());
1670         killTime = CmiWallTimer()+0.001;
1671         CcdCallFnAfter(killLocal,NULL,1);
1672 #endif
1673 }
1674 #endif
1675
1676 #endif
1677
1678 #include "CkMemCheckpoint.def.h"
1679
1680