recover from soft failure
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74
75 // static, so that it is accessible from Converse part
76 int CkMemCheckPT::inRestarting = 0;
77 int CkMemCheckPT::inLoadbalancing = 0;
78 double CkMemCheckPT::startTime;
79 char *CkMemCheckPT::stage;
80 CkCallback CkMemCheckPT::cpCallback;
81
82 int _memChkptOn = 1;                    // checkpoint is on or off
83
84 CkGroupID ckCheckPTGroupID;             // readonly
85
86 static int checkpointed = 0;
87
88 /// @todo the following declarations should be moved into a separate file for all 
89 // fault tolerant strategies
90
91 #ifdef CMK_MEM_CHECKPOINT
92 // name of the kill file that contains processes to be killed 
93 char *killFile;                                               
94 // flag for the kill file         
95 int killFlag=0;
96 // variable for storing the killing time
97 double killTime=0.0;
98 #endif
99
100 #ifdef CKLOCMGR_LOOP
101 #undef CKLOCMGR_LOOP
102 #endif
103 // loop over all CkLocMgr and do "code"
104 #define  CKLOCMGR_LOOP(code)    {       \
105   int numGroups = CkpvAccess(_groupIDTable)->size();    \
106   for(int i=0;i<numGroups;i++) {        \
107     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
108     if(obj->isLocMgr())  {      \
109       CkLocMgr *mgr = (CkLocMgr*)obj;   \
110       code      \
111     }   \
112   }     \
113  }
114
115 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
116 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
117 CpvDeclare(CkCheckPTMessage**, chkpBuf);
118 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
119 //store the checkpoint of the buddy to compare
120 //do not need the whole msg, can be the checksum
121 CpvDeclare(CkCheckPTMessage*, buddyBuf);
122 //pointer of the checkpoint going to be written
123 CpvDeclare(int, curPointer);
124 CpvDeclare(int, recvdRemote);
125 CpvDeclare(int, recvdLocal);
126
127 bool compare(char * buf1, char * buf2);
128 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
129 // Converse function handles
130 static int askPhaseHandlerIdx;
131 static int recvPhaseHandlerIdx;
132 static int askProcDataHandlerIdx;
133 static int restartBcastHandlerIdx;
134 static int recoverProcDataHandlerIdx;
135 static int restartBeginHandlerIdx;
136 static int recvRemoteChkpHandlerIdx;
137 static int notifyHandlerIdx;
138 // compute the backup processor
139 // FIXME: avoid crashed processors
140 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
141
142 inline int CkMemCheckPT::BuddyPE(int pe)
143 {
144   int budpe;
145 #if NODE_CHECKPOINT
146     // buddy is the processor with same rank on the next physical node
147   int r1 = CmiPhysicalRank(pe);
148   int budnode = CmiPhysicalNodeID(pe);
149   do {
150     budnode = (budnode+1)%CmiNumPhysicalNodes();
151     int *pelist;
152     int num;
153     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
154     budpe = pelist[r1 % num];
155   } while (isFailed(budpe));
156   if (budpe == pe) {
157     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
158     CmiAbort("Failed to find a buddy processor");
159   }
160 #else
161   budpe = pe;
162   while (budpe == pe || isFailed(budpe)) 
163           budpe = (budpe+1)%CkNumPes();
164 #endif
165   return budpe;
166 }
167
168 // called in array element constructor
169 // choose and register with 2 buddies for checkpoiting 
170 #if CMK_MEM_CHECKPOINT
171 void ArrayElement::init_checkpt() {
172         if (_memChkptOn == 0) return;
173         if (CkInRestarting()) {
174           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
175         }
176         // only master init checkpoint
177         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
178
179         budPEs[0] = CkMyPe();
180         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
181         CmiAssert(budPEs[0] != budPEs[1]);
182         // inform checkPTMgr
183         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
184         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
185         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
186         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
187 }
188 #endif
189
190 // entry function invoked by checkpoint mgr asking for checkpoint data
191 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
192 #if CMK_MEM_CHECKPOINT
193 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
194 //char index[128];   thisIndexMax.sprint(index);
195 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
196   CkLocMgr *locMgr = thisArray->getLocMgr();
197   CmiAssert(myRec!=NULL);
198   int size;
199   {
200         PUP::sizer p;
201         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
202         size = p.size();
203   }
204   int packSize = size/sizeof(double) +1;
205   CkArrayCheckPTMessage *msg =
206                  new (packSize, 0) CkArrayCheckPTMessage;
207   msg->len = size;
208   msg->index =thisIndexMax;
209   msg->aid = thisArrayID;
210   msg->locMgr = locMgr->getGroupID();
211   msg->cp_flag = 1;
212   {
213         PUP::toMem p(msg->packData);
214         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
215   }
216
217   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
218   checkptMgr.recvData(msg, 2, budPEs);
219   delete m;
220 #endif
221 }
222
223 // checkpoint holder class - for memory checkpointing
224 class CkMemCheckPTInfo: public CkCheckPTInfo
225 {
226   CkArrayCheckPTMessage *ckBuffer;
227 public:
228   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
229                     CkCheckPTInfo(a, loc, idx, pno)
230   {
231     ckBuffer = NULL;
232   }
233   ~CkMemCheckPTInfo() 
234   {
235     if (ckBuffer) delete ckBuffer; 
236   }
237   inline void updateBuffer(CkArrayCheckPTMessage *data) 
238   {
239     CmiAssert(data!=NULL);
240     if (ckBuffer) delete ckBuffer;
241     ckBuffer = data;
242   }    
243   inline CkArrayCheckPTMessage * getCopy()
244   {
245     if (ckBuffer == NULL) {
246       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
247       CmiAbort("Abort!");
248     }
249     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
250   }     
251   inline void updateBuddy(int b1, int b2) {
252      CmiAssert(ckBuffer);
253      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
254      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
255      CmiAssert(pNo != CkMyPe());
256   }
257   inline int getSize() { 
258      CmiAssert(ckBuffer);
259      return ckBuffer->len; 
260   }
261 };
262
263 // checkpoint holder class - for in-disk checkpointing
264 class CkDiskCheckPTInfo: public CkCheckPTInfo 
265 {
266   char *fname;
267   int bud1, bud2;
268   int len;                      // checkpoint size
269 public:
270   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
271   {
272 #if CMK_USE_MKSTEMP
273     fname = new char[64];
274     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
275     mkstemp(fname);
276 #else
277     fname=tmpnam(NULL);
278 #endif
279     bud1 = bud2 = -1;
280     len = 0;
281   }
282   ~CkDiskCheckPTInfo() 
283   {
284     remove(fname);
285   }
286   inline void updateBuffer(CkArrayCheckPTMessage *data) 
287   {
288     double t = CmiWallTimer();
289     // unpack it
290     envelope *env = UsrToEnv(data);
291     CkUnpackMessage(&env);
292     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
293     FILE *f = fopen(fname,"wb");
294     PUP::toDisk p(f);
295     CkPupMessage(p, (void **)&data);
296     // delay sync to the end because otherwise the messages are blocked
297 //    fsync(fileno(f));
298     fclose(f);
299     bud1 = data->bud1;
300     bud2 = data->bud2;
301     len = data->len;
302     delete data;
303     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
304   }
305   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
306   {
307     CkArrayCheckPTMessage *data;
308     FILE *f = fopen(fname,"rb");
309     PUP::fromDisk p(f);
310     CkPupMessage(p, (void **)&data);
311     fclose(f);
312     data->bud1 = bud1;                          // update the buddies
313     data->bud2 = bud2;
314     return data;
315   }
316   inline void updateBuddy(int b1, int b2) {
317      bud1 = b1; bud2 = b2;
318      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
319      CmiAssert(pNo != CkMyPe());
320   }
321   inline int getSize() { 
322      return len; 
323   }
324 };
325
326 CkMemCheckPT::CkMemCheckPT(int w)
327 {
328   int numnodes = 0;
329 #if NODE_CHECKPOINT
330   numnodes = CmiNumPhysicalNodes();
331 #else
332   numnodes = CkNumPes();
333 #endif
334 #if CK_NO_PROC_POOL
335   if (numnodes <= 2)
336 #else
337   if (numnodes  == 1)
338 #endif
339   {
340     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
341     _memChkptOn = 0;
342   }
343   inRestarting = 0;
344   recvCount = peCount = 0;
345   ackCount = 0;
346   expectCount = -1;
347   where = w;
348
349 #if CMK_CONVERSE_MPI
350   void pingBuddy();
351   void pingCheckHandler();
352   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
353   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
354 #endif
355   chkpTable[0] = NULL;
356   chkpTable[1] = NULL;
357 }
358
359 CkMemCheckPT::~CkMemCheckPT()
360 {
361   int len = ckTable.length();
362   for (int i=0; i<len; i++) {
363     delete ckTable[i];
364   }
365 }
366
367 void CkMemCheckPT::pup(PUP::er& p) 
368
369   CBase_CkMemCheckPT::pup(p); 
370   p|cpStarter;
371   p|thisFailedPe;
372   p|failedPes;
373   p|ckCheckPTGroupID;           // recover global variable
374   p|cpCallback;                 // store callback
375   p|where;                      // where to checkpoint
376   p|peCount;
377   if (p.isUnpacking()) {
378     recvCount = 0;
379 #if CMK_CONVERSE_MPI
380     void pingBuddy();
381     void pingCheckHandler();
382     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
383     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
384 #endif
385   }
386 }
387
388 // called by checkpoint mgr to restore an array element
389 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
390 {
391 #if CMK_MEM_CHECKPOINT
392   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
393   // m->index.print();
394   PUP::fromMem p(m->packData);
395   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
396   CmiAssert(mgr);
397 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
398   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
399 #else
400   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
401 #endif
402
403   // find a list of array elements bound together
404   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
405   CmiAssert(elt);
406   CkLocRec_local *rec = elt->myRec;
407   CkVec<CkMigratable *> list;
408   mgr->migratableList(rec, list);
409   CmiAssert(list.length() > 0);
410   for (int l=0; l<list.length(); l++) {
411     elt = (ArrayElement *)list[l];
412     elt->budPEs[0] = m->bud1;
413     elt->budPEs[1] = m->bud2;
414     //    reset, may not needed now
415     // for now.
416     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
417       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
418       if (c) c->redNo = 0;
419     }
420   }
421 #endif
422 }
423
424 // return 1 if pe is a crashed processor
425 int CkMemCheckPT::isFailed(int pe)
426 {
427   for (int i=0; i<failedPes.length(); i++)
428     if (failedPes[i] == pe) return 1;
429   return 0;
430 }
431
432 // add pe into history list of all failed processors
433 void CkMemCheckPT::failed(int pe)
434 {
435   if (isFailed(pe)) return;
436   failedPes.push_back(pe);
437 }
438
439 int CkMemCheckPT::totalFailed()
440 {
441   return failedPes.length();
442 }
443
444 // create an checkpoint entry for array element of aid with index.
445 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
446 {
447   // error check, no duplicate
448   int idx, len = ckTable.size();
449   for (idx=0; idx<len; idx++) {
450     CkCheckPTInfo *entry = ckTable[idx];
451     if (index == entry->index) {
452       if (loc == entry->locMgr) {
453           // bindTo array elements
454           return;
455       }
456         // for array inheritance, the following check may fail
457         // because ArrayElement constructor of all superclasses are called
458       if (aid == entry->aid) {
459         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
460         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
461       }
462     }
463   }
464   CkCheckPTInfo *newEntry;
465   if (where == CkCheckPoint_inMEM)
466     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
467   else
468     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
469   ckTable.push_back(newEntry);
470   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
471 }
472
473 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
474 {
475 #if !CMK_CHKP_ALL       
476   int buddy = msg->bud1;
477   if (buddy == CkMyPe()) buddy = msg->bud2;
478   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
479   recvData(msg);
480     // ack
481   thisProxy[buddy].gotData();
482 #else
483   chkpTable[0] = NULL;
484   chkpTable[1] = NULL;
485   recvArrayCheckpoint(msg);
486   thisProxy[msg->bud2].gotData();
487 #endif
488 }
489
490 // loop through my checkpoint table and ask checkpointed array elements
491 // to send me checkpoint data.
492 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
493 {
494   checkpointed = 1;
495   cpCallback = cb;
496   cpStarter = starter;
497   if (CkMyPe() == cpStarter) {
498     startTime = CmiWallTimer();
499     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
500   }
501 #if CMK_CONVERSE_MPI
502   if(CmiNumPartition()==1)
503 #endif    
504   {
505
506 #if !CMK_CHKP_ALL
507   int len = ckTable.length();
508   for (int i=0; i<len; i++) {
509     CkCheckPTInfo *entry = ckTable[i];
510       // always let the bigger number processor send request
511     //if (CkMyPe() < entry->pNo) continue;
512       // always let the smaller number processor send request, may on same proc
513     if (!isMaster(entry->pNo)) continue;
514       // call inmem_checkpoint to the array element, ask it to send
515       // back checkpoint data via recvData().
516     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
517     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
518   }
519     // if my table is empty, then I am done
520   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
521 #else
522   startArrayCheckpoint();
523 #endif
524         sendProcData();
525   }
526 #if CMK_CONVERSE_MPI
527   else
528   {
529         startCheckpoint();
530   }
531 #endif
532   // pack and send proc level data
533 }
534
535 class MemElementPacker : public CkLocIterator{
536         private:
537                 CkLocMgr *locMgr;
538                 PUP::er &p;
539         public:
540                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
541                 void addLocation(CkLocation &loc){
542                         CkArrayIndexMax idx = loc.getIndex();
543                         CkGroupID gID = locMgr->ckGetGroupID();
544                         p|gID;
545                         p|idx;
546                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
547                 }
548 };
549
550 void CkMemCheckPT::pupAllElements(PUP::er &p){
551 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
552         int numElements;
553         if(!p.isUnpacking()){
554                 numElements = CkCountArrayElements();
555         }
556         p | numElements;
557         if(!p.isUnpacking()){
558                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
559         }
560 #endif
561 }
562
563 void CkMemCheckPT::startArrayCheckpoint(){
564 #if CMK_CHKP_ALL
565         int size;
566         {
567                 PUP::sizer psizer;
568                 pupAllElements(psizer);
569                 size = psizer.size();
570         }
571         int packSize = size/sizeof(double)+1;
572  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
573         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
574         msg->len = size;
575         msg->cp_flag = 1;
576         int budPEs[2];
577         msg->bud1=CkMyPe();
578         msg->bud2=ChkptOnPe(CkMyPe());
579         {
580                 PUP::toMem p(msg->packData);
581                 pupAllElements(p);
582         }
583         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
584         if(chkpTable[0]) delete chkpTable[0];
585         chkpTable[0] = msg;
586         //send the checkpoint to my 
587         recvCount++;
588 #endif
589 }
590
591 void CkMemCheckPT::startCheckpoint(){
592 #if CMK_CONVERSE_MPI
593   int size;
594   {
595     PUP::sizer p;
596     _handleProcData(p,CmiFalse);
597     size = p.size();
598   }
599         int packSize = size/sizeof(double)+1;
600         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
601         procMsg->len = size;
602         procMsg->cp_flag = 1;
603         {
604                 PUP::toMem p(procMsg->packData);
605                 _handleProcData(p,CmiFalse);
606         }
607         int pointer = CpvAccess(curPointer);
608         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
609                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
610         
611         {
612                 PUP::sizer psizer;
613                 pupAllElements(psizer);
614                 size = psizer.size();
615         }
616         packSize = size/sizeof(double)+1;
617         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
618         msg->len = size;
619         msg->cp_flag = 1;
620         {
621                 PUP::toMem p(msg->packData);
622                 pupAllElements(p);
623         }
624         pointer = CpvAccess(curPointer);
625         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
626                 CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
627         CpvAccess(recvdLocal) = 1;
628
629         envelope * env = (envelope *)(UsrToEnv(msg));
630         CkPackMessage(&env);
631         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
632         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
633         if(CpvAccess(recvdRemote)==1){
634                 //compare the checkpoint 
635           int size = CpvAccess(chkpBuf)[pointer]->len;
636           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
637                 thisProxy[CkMyPe()].doneComparison(true);
638           }else{
639                 thisProxy[CkMyPe()].doneComparison(true);
640           }
641         }
642 #endif
643 }
644
645 void CkMemCheckPT::doneComparison(bool ret){
646         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
647         contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
648 }
649
650 void CkMemCheckPT::doneRComparison(bool ret){
651         CpvAccess(recvdRemote) = 0;
652         CpvAccess(recvdLocal) = 0;
653 //      if(CpvAccess(curPointer) == 0){
654         if(ret==true){
655         CpvAccess(curPointer)^=1;
656                 if(CkMyPe() == 0){
657                         cpCallback.send();
658                 }
659         }else{
660                 RollBack();
661         }
662 }
663
664 void CkMemCheckPT::RollBack(){
665         //restore group data
666         CkMemCheckPT::inRestarting = 1;
667         int pointer = CpvAccess(curPointer)^1;//use the previous one
668     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
669         PUP::fromMem p(chkpMsg->packData);      
670         //_handleProcData(p);
671         
672         //destroy array elements
673         //CKLOCMGR_LOOP(mgr->flushLocalRecs(););
674           int numGroups = CkpvAccess(_groupIDTable)->size();
675           for(int i=0;i<numGroups;i++) {
676                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
677                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
678                 obj->flushStates();
679                 obj->ckJustMigrated();
680           }
681         //restore array elements
682         
683         int numElements;
684         p|numElements;
685         
686         if(p.isUnpacking()){
687                 for(int i=0;i<numElements;i++){
688                 //for(int i=0;i<1;i++){
689                         CkGroupID gID;
690                         CkArrayIndex idx;
691                         p|gID;
692                         p|idx;
693                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
694                         CmiAssert(mgr);
695                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
696                 }
697         }
698         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
699         contribute(cb);
700 }
701
702 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
703 {
704 #if CMK_CHKP_ALL
705         int idx = 1;
706         if(msg->bud1 == CkMyPe()){
707                 idx = 0;
708         }
709         int isChkpting = msg->cp_flag;
710         if(isChkpting == 1){
711                 if(chkpTable[idx]) delete chkpTable[idx];
712         }
713         chkpTable[idx] = msg;
714         if(isChkpting){
715                 recvCount++;
716                 if(recvCount == 2){
717                   if (where == CkCheckPoint_inMEM) {
718                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
719                   }
720                   else if (where == CkCheckPoint_inDISK) {
721                         // another barrier for finalize the writing using fsync
722                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
723                         contribute(0,NULL,CkReduction::sum_int,localcb);
724                   }
725                   else
726                         CmiAbort("Unknown checkpoint scheme");
727                   recvCount = 0;
728                 }
729         }
730 #endif
731 }
732
733 // don't handle array elements
734 static inline void _handleProcData(PUP::er &p, CmiBool create)
735 {
736     // save readonlys, and callback BTW
737     CkPupROData(p);
738
739     // save mainchares 
740     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
741         
742 #ifndef CMK_CHARE_USE_PTR
743     // save non-migratable chare
744     CkPupChareData(p);
745 #endif
746
747     // save groups into Groups.dat
748     CkPupGroupData(p,create);
749
750     // save nodegroups into NodeGroups.dat
751     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
752 }
753
754 void CkMemCheckPT::sendProcData()
755 {
756   // find out size of buffer
757   int size;
758   {
759     PUP::sizer p;
760     _handleProcData(p,CmiTrue);
761     size = p.size();
762   }
763   int packSize = size;
764   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
765   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
766   {
767     PUP::toMem p(msg->packData);
768     _handleProcData(p,CmiTrue);
769   }
770   msg->pe = CkMyPe();
771   msg->len = size;
772   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
773   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
774 }
775
776 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
777 {
778   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
779   CpvAccess(procChkptBuf) = msg;
780   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
781   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
782 }
783
784 // ArrayElement call this function to give us the checkpointed data
785 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
786 {
787   int len = ckTable.length();
788   int idx;
789   for (idx=0; idx<len; idx++) {
790     CkCheckPTInfo *entry = ckTable[idx];
791     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
792   }
793   CkAssert(idx < len);
794   int isChkpting = msg->cp_flag;
795   ckTable[idx]->updateBuffer(msg);
796   if (isChkpting) {
797       // all my array elements have returned their inmem data
798       // inform starter processor that I am done.
799     recvCount ++;
800     if (recvCount == ckTable.length()) {
801       if (where == CkCheckPoint_inMEM) {
802         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
803       }
804       else if (where == CkCheckPoint_inDISK) {
805         // another barrier for finalize the writing using fsync
806         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
807         contribute(0,NULL,CkReduction::sum_int,localcb);
808       }
809       else
810         CmiAbort("Unknown checkpoint scheme");
811       recvCount = 0;
812     } 
813   }
814 }
815
816 // only used in disk checkpointing
817 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
818 {
819   delete m;
820 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
821   system("sync");
822 #endif
823   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
824 }
825
826 // only is called on cpStarter when checkpoint is done
827 void CkMemCheckPT::cpFinish()
828 {
829   CmiAssert(CkMyPe() == cpStarter);
830   peCount++;
831     // now that all processors have finished, activate callback
832   if (peCount == 2) 
833 {
834     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
835     cpCallback.send();
836     peCount = 0;
837     thisProxy.report();
838   }
839 }
840
841 // for debugging, report checkpoint info
842 void CkMemCheckPT::report()
843 {
844 #if !CMK_CHKP_ALL
845   int objsize = 0;
846   int len = ckTable.length();
847   for (int i=0; i<len; i++) {
848     CkCheckPTInfo *entry = ckTable[i];
849     CmiAssert(entry);
850     objsize += entry->getSize();
851   }
852   CmiAssert(CpvAccess(procChkptBuf));
853   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
854 #else
855   if(CkMyPe()==0)
856   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
857 #endif
858 }
859
860 /*****************************************************************************
861                         RESTART Procedure
862 *****************************************************************************/
863
864 // master processor of two buddies
865 inline int CkMemCheckPT::isMaster(int buddype)
866 {
867 #if 0
868   int mype = CkMyPe();
869 //CkPrintf("ismaster: %d %d\n", pe, mype);
870   if (CkNumPes() - totalFailed() == 2) {
871     return mype > buddype;
872   }
873   for (int i=1; i<CkNumPes(); i++) {
874     int me = (buddype+i)%CkNumPes();
875     if (isFailed(me)) continue;
876     if (me == mype) return 1;
877     else return 0;
878   }
879   return 0;
880 #else
881     // smaller one
882   int mype = CkMyPe();
883 //CkPrintf("ismaster: %d %d\n", pe, mype);
884   if (CkNumPes() - totalFailed() == 2) {
885     return mype < buddype;
886   }
887 #if NODE_CHECKPOINT
888   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
889   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
890 #else
891   for (int i=1; i<CkNumPes(); i++) {
892 #endif
893     int me = (mype+i)%CkNumPes();
894     if (isFailed(me)) continue;
895     if (me == buddype) return 1;
896     else return 0;
897   }
898   return 0;
899 #endif
900 }
901
902
903
904 #if 0
905 // helper class to pup all elements that belong to same ckLocMgr
906 class ElementDestoryer : public CkLocIterator {
907 private:
908         CkLocMgr *locMgr;
909 public:
910         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
911         void addLocation(CkLocation &loc) {
912                 CkArrayIndex idx=loc.getIndex();
913                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
914                 loc.destroy();
915         }
916 };
917 #endif
918
919 // restore the bitmap vector for LB
920 void CkMemCheckPT::resetLB(int diepe)
921 {
922 #if CMK_LBDB_ON
923   int i;
924   char *bitmap = new char[CkNumPes()];
925   // set processor available bitmap
926   get_avail_vector(bitmap);
927
928   for (i=0; i<failedPes.length(); i++)
929     bitmap[failedPes[i]] = 0; 
930   bitmap[diepe] = 0;
931
932 #if CK_NO_PROC_POOL
933   set_avail_vector(bitmap);
934 #endif
935
936   // if I am the crashed pe, rebuild my failedPEs array
937   if (CkMyNode() == diepe)
938     for (i=0; i<CkNumPes(); i++) 
939       if (bitmap[i]==0) failed(i);
940
941   delete [] bitmap;
942 #endif
943 }
944
945 // in case when failedPe dies, everybody go through its checkpoint table:
946 // destory all array elements
947 // recover lost buddies
948 // reconstruct all array elements from check point data
949 // called on all processors
950 void CkMemCheckPT::restart(int diePe)
951 {
952 #if CMK_MEM_CHECKPOINT
953   double curTime = CmiWallTimer();
954   if (CkMyPe() == diePe)
955     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
956   stage = (char*)"resetLB";
957   startTime = curTime;
958   if (CkMyPe() == diePe)
959     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
960
961 #if CK_NO_PROC_POOL
962   failed(diePe);        // add into the list of failed pes
963 #endif
964   thisFailedPe = diePe;
965
966   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
967
968   inRestarting = 1;
969                                                                                 
970   // disable load balancer's barrier
971   if (CkMyPe() != diePe) resetLB(diePe);
972
973   CKLOCMGR_LOOP(mgr->startInserting(););
974
975   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
976   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
977 /*
978   if (CkMyPe() == 0)
979     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
980 */
981 #endif
982 }
983
984 // loally remove all array elements
985 void CkMemCheckPT::removeArrayElements()
986 {
987 #if CMK_MEM_CHECKPOINT
988   int len = ckTable.length();
989   double curTime = CmiWallTimer();
990   if (CkMyPe() == thisFailedPe) 
991     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
992   stage = (char*)"removeArrayElements";
993   startTime = curTime;
994
995   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
996   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
997
998   // get rid of all buffering and remote recs
999   // including destorying all array elements
1000 #if CK_NO_PROC_POOL  
1001         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1002 #else
1003         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1004 #endif
1005 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
1006
1007   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1008   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1009 #endif
1010 }
1011
1012 // flush state in reduction manager
1013 void CkMemCheckPT::resetReductionMgr()
1014 {
1015   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1016   int numGroups = CkpvAccess(_groupIDTable)->size();
1017   for(int i=0;i<numGroups;i++) {
1018     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1019     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1020     obj->flushStates();
1021     obj->ckJustMigrated();
1022   }
1023   // reset again
1024   //CpvAccess(_qd)->flushStates();
1025
1026 #if 1
1027   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1028   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1029 #else
1030   if (CkMyPe() == 0)
1031     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1032 #endif
1033 }
1034
1035 // recover the lost buddies
1036 void CkMemCheckPT::recoverBuddies()
1037 {
1038   int idx;
1039   int len = ckTable.length();
1040   // ready to flush reduction manager
1041   // cannot be CkMemCheckPT::restart because destory will modify states
1042   double curTime = CmiWallTimer();
1043   if (CkMyPe() == thisFailedPe)
1044   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1045   stage = (char *)"recoverBuddies";
1046   if (CkMyPe() == thisFailedPe)
1047   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1048   startTime = curTime;
1049
1050   // recover buddies
1051   expectCount = 0;
1052 #if !CMK_CHKP_ALL
1053   for (idx=0; idx<len; idx++) {
1054     CkCheckPTInfo *entry = ckTable[idx];
1055     if (entry->pNo == thisFailedPe) {
1056 #if CK_NO_PROC_POOL
1057       // find a new buddy
1058       int budPe = BuddyPE(CkMyPe());
1059 #else
1060       int budPe = thisFailedPe;
1061 #endif
1062       CkArrayCheckPTMessage *msg = entry->getCopy();
1063       msg->bud1 = budPe;
1064       msg->bud2 = CkMyPe();
1065       msg->cp_flag = 0;            // not checkpointing
1066       thisProxy[budPe].recoverEntry(msg);
1067       expectCount ++;
1068     }
1069   }
1070 #else
1071   //send to failed pe
1072   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1073 #if CK_NO_PROC_POOL
1074       // find a new buddy
1075       int budPe = BuddyPE(CkMyPe());
1076 #else
1077       int budPe = thisFailedPe;
1078 #endif
1079       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1080       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1081           msg->cp_flag = 0;            // not checkpointing
1082       msg->bud1 = budPe;
1083       msg->bud2 = CkMyPe();
1084       thisProxy[budPe].recoverEntry(msg);
1085       expectCount ++;
1086   }
1087 #endif
1088
1089 #if 1
1090   if (expectCount == 0) {
1091     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1092     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1093   }
1094 #else
1095   if (CkMyPe() == 0) {
1096     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1097   }
1098 #endif
1099
1100   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1101 }
1102
1103 void CkMemCheckPT::gotData()
1104 {
1105   ackCount ++;
1106   if (ackCount == expectCount) {
1107     ackCount = 0;
1108     expectCount = -1;
1109     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1110     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1111   }
1112 }
1113
1114 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1115 {
1116
1117   for (int i=0; i<n; i++) {
1118     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1119     mgr->updateLocation(idx[i], nowOnPe);
1120   }
1121         thisProxy[nowOnPe].gotReply();
1122 }
1123
1124 // restore array elements
1125 void CkMemCheckPT::recoverArrayElements()
1126 {
1127   double curTime = CmiWallTimer();
1128   int len = ckTable.length();
1129   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1130   stage = (char *)"recoverArrayElements";
1131   if (CkMyPe() == thisFailedPe)
1132   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1133   startTime = curTime;
1134  int flag = 0;
1135   // recover all array elements
1136   int count = 0;
1137
1138 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1139   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1140   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1141 #endif
1142
1143 #if !CMK_CHKP_ALL
1144   for (int idx=0; idx<len; idx++)
1145   {
1146     CkCheckPTInfo *entry = ckTable[idx];
1147 #if CK_NO_PROC_POOL
1148     // the bigger one will do 
1149 //    if (CkMyPe() < entry->pNo) continue;
1150     if (!isMaster(entry->pNo)) continue;
1151 #else
1152     // smaller one do it, which has the original object
1153     if (CkMyPe() == entry->pNo+1 || 
1154         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1155 #endif
1156 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1157
1158     entry->updateBuddy(CkMyPe(), entry->pNo);
1159     CkArrayCheckPTMessage *msg = entry->getCopy();
1160     // gzheng
1161     //thisProxy[CkMyPe()].inmem_restore(msg);
1162     inmem_restore(msg);
1163 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1164     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1165     int homePe = mgr->homePe(msg->index);
1166     if (homePe != CkMyPe()) {
1167       gmap[homePe].push_back(msg->locMgr);
1168       imap[homePe].push_back(msg->index);
1169     }
1170 #endif
1171     CkFreeMsg(msg);
1172     count ++;
1173   }
1174 #else
1175         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1176 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1177         recoverAll(msg,gmap,imap);
1178 #else
1179         recoverAll(msg);
1180 #endif
1181     CkFreeMsg(msg);
1182 #endif
1183   curTime = CmiWallTimer();
1184   if (CkMyPe() == thisFailedPe)
1185         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1186 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1187   for (int i=0; i<CkNumPes(); i++) {
1188     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1189       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1190         flag++; 
1191           }
1192   }
1193   delete [] imap;
1194   delete [] gmap;
1195 #endif
1196   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1197
1198   CKLOCMGR_LOOP(mgr->doneInserting(););
1199
1200   // _crashedNode = -1;
1201   CpvAccess(_crashedNode) = -1;
1202   inRestarting = 0;
1203 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1204   if (CkMyPe() == 0)
1205     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1206 #else
1207 if(flag == 0)
1208 {
1209     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1210 }
1211 #endif
1212 }
1213
1214 void CkMemCheckPT::gotReply(){
1215     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1216 }
1217
1218 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1219 #if CMK_CHKP_ALL
1220         PUP::fromMem p(msg->packData);
1221         int numElements;
1222         p|numElements;
1223         CkPrintf("[%d] recover all %d\n",CkMyPe(),numElements);
1224         if(p.isUnpacking()){
1225                 for(int i=0;i<numElements;i++){
1226                         CkGroupID gID;
1227                         CkArrayIndex idx;
1228                         p|gID;
1229                         p|idx;
1230                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1231                         int homePe = mgr->homePe(idx);
1232 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1233                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1234 #else
1235                         mgr->resume(idx,p,CmiFalse,CmiTrue);
1236 #endif
1237 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1238                         homePe = mgr->homePe(idx);
1239                         if (homePe != CkMyPe()) {
1240                           gmap[homePe].push_back(gID);
1241                           imap[homePe].push_back(idx);
1242                         }
1243 #endif
1244                 }
1245         }
1246         if(CkMyPe()==thisFailedPe)
1247         CkPrintf("recover all ends\n"); 
1248 #endif
1249 }
1250
1251 static double restartT;
1252
1253 // on every processor
1254 // turn load balancer back on
1255 void CkMemCheckPT::finishUp()
1256 {
1257   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1258   //CKLOCMGR_LOOP(mgr->doneInserting(););
1259   
1260   if (CkMyPe() == thisFailedPe)
1261   {
1262        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1263        //CkStartQD(cpCallback);
1264        cpCallback.send();
1265        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1266   }
1267
1268 #if CK_NO_PROC_POOL
1269 #if NODE_CHECKPOINT
1270   int numnodes = CmiNumPhysicalNodes();
1271 #else
1272   int numnodes = CkNumPes();
1273 #endif
1274   if (numnodes-totalFailed() <=2) {
1275     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1276     _memChkptOn = 0;
1277   }
1278 #endif
1279 }
1280
1281 void CkMemCheckPT::recoverFromSoftFailure()
1282 {
1283         inRestarting = 0;
1284         if(CkMyPe()==0)
1285         cpCallback.send();
1286 }
1287 // called only on 0
1288 void CkMemCheckPT::quiescence(CkCallback &cb)
1289 {
1290   static int pe_count = 0;
1291   pe_count ++;
1292   CmiAssert(CkMyPe() == 0);
1293   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1294   if (pe_count == CkNumPes()) {
1295     pe_count = 0;
1296     cb.send();
1297   }
1298 }
1299
1300 // User callable function - to start a checkpoint
1301 // callback cb is used to pass control back
1302 void CkStartMemCheckpoint(CkCallback &cb)
1303 {
1304 #if CMK_MEM_CHECKPOINT
1305   if (_memChkptOn == 0) {
1306     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1307     cb.send();
1308     return;
1309   }
1310   if (CkInRestarting()) {
1311       // trying to checkpointing during restart
1312     cb.send();
1313     return;
1314   }
1315     // store user callback and user data
1316   CkMemCheckPT::cpCallback = cb;
1317
1318     // broadcast to start check pointing
1319   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1320   checkptMgr.doItNow(CkMyPe(), cb);
1321 #else
1322   // when mem checkpoint is disabled, invike cb immediately
1323   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1324   cb.send();
1325 #endif
1326 }
1327
1328 void CkRestartCheckPoint(int diePe)
1329 {
1330 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1331   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1332   // broadcast
1333   checkptMgr.restart(diePe);
1334 }
1335
1336 static int _diePE = -1;
1337
1338 // callback function used locally by ccs handler
1339 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1340 {
1341 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1342   CkRestartCheckPoint(_diePE);
1343 }
1344
1345
1346 // called on crashed PE
1347 static void restartBeginHandler(char *msg)
1348 {
1349   CmiFree(msg);
1350 #if CMK_MEM_CHECKPOINT
1351 #if CMK_USE_BARRIER
1352         if(CkMyPe()!=_diePE){
1353                 printf("restar begin on %d\n",CkMyPe());
1354                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1355                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1356                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1357         }else{
1358         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1359         CkRestartCheckPointCallback(NULL, NULL);
1360         }
1361 #else
1362   static int count = 0;
1363   CmiAssert(CkMyPe() == _diePE);
1364   count ++;
1365   if (count == CkNumPes()) {
1366     CkRestartCheckPointCallback(NULL, NULL);
1367     count = 0;
1368   }
1369 #endif
1370 #endif
1371 }
1372
1373 extern void _discard_charm_message();
1374 extern void _resume_charm_message();
1375
1376 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1377         return data;
1378 }
1379
1380 static void restartBcastHandler(char *msg)
1381 {
1382 #if CMK_MEM_CHECKPOINT
1383   // advance phase counter
1384   CkMemCheckPT::inRestarting = 1;
1385   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1386   // gzheng
1387   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1388
1389   if (CkMyPe()==_diePE)
1390     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1391
1392   // reset QD counters
1393 /*  gzheng
1394   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1395 */
1396
1397 /*  gzheng
1398   if (CkMyPe()==_diePE)
1399       CkRestartCheckPointCallback(NULL, NULL);
1400 */
1401   CmiFree(msg);
1402
1403   _resume_charm_message();
1404
1405     // reduction
1406   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1407   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1408 #if CMK_USE_BARRIER
1409         //CmiPrintf("before reduce\n"); 
1410         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1411         //CmiPrintf("after reduce\n");  
1412 #else
1413   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1414 #endif 
1415  checkpointed = 0;
1416 #endif
1417 }
1418
1419 extern void _initDone();
1420
1421 bool compare(char * buf1, char *buf2){
1422         return true;
1423 }
1424
1425 static void recvRemoteChkpHandler(char *msg){
1426    envelope *env = (envelope *)msg;
1427    CkUnpackMessage(&env);
1428    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1429   if(CpvAccess(recvdLocal)==1){
1430           int pointer = CpvAccess(curPointer);
1431           int size = CpvAccess(chkpBuf)[pointer]->len;
1432           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1433           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1434                 
1435                 checkptMgr[CkMyPe()].doneComparison(true);
1436                 }else
1437                 {
1438                         checkptMgr[CkMyPe()].doneComparison(false);
1439                 }
1440   }else{
1441           CpvAccess(recvdRemote) = 1;
1442           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1443           CpvAccess(buddyBuf) = chkpMsg;
1444   }  
1445 }
1446
1447 // called on crashed processor
1448 static void recoverProcDataHandler(char *msg)
1449 {
1450 #if CMK_MEM_CHECKPOINT
1451    int i;
1452    envelope *env = (envelope *)msg;
1453    CkUnpackMessage(&env);
1454    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1455    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1456    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1457    //cur_restart_phase ++;
1458      // gzheng ?
1459    //CpvAccess(_qd)->flushStates();
1460
1461    // restore readonly, mainchare, group, nodegroup
1462 //   int temp = cur_restart_phase;
1463 //   cur_restart_phase = -1;
1464    PUP::fromMem p(procMsg->packData);
1465    _handleProcData(p,CmiTrue);
1466
1467    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1468    // gzheng
1469    CKLOCMGR_LOOP(mgr->startInserting(););
1470
1471    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1472    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1473    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1474    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1475
1476    _initDone();
1477 //   CpvAccess(_qd)->flushStates();
1478    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1479 #endif
1480 }
1481
1482 // called on its backup processor
1483 // get backup message buffer and sent to crashed processor
1484 static void askProcDataHandler(char *msg)
1485 {
1486 #if CMK_MEM_CHECKPOINT
1487     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1488     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1489     if (CpvAccess(procChkptBuf) == NULL)  {
1490       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1491       CkAbort("no checkpoint found");
1492     }
1493     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1494     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1495
1496     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1497
1498     CkPackMessage(&env);
1499     CmiSetHandler(env, recoverProcDataHandlerIdx);
1500     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1501     CpvAccess(procChkptBuf) = NULL;
1502     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1503 #endif
1504 }
1505
1506 // called on PE 0
1507 void qd_callback(void *m)
1508 {
1509    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1510    CkFreeMsg(m);
1511 #ifdef CMK_SMP
1512    for(int i=0;i<CmiMyNodeSize();i++){
1513    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1514    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1515         CmiSetHandler(msg, askProcDataHandlerIdx);
1516         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1517         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1518    }
1519    return;
1520 #endif
1521    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1522    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1523    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1524    CmiSetHandler(msg, askProcDataHandlerIdx);
1525    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1526    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1527
1528 }
1529
1530 // on crashed node
1531 void CkMemRestart(const char *dummy, CkArgMsg *args)
1532 {
1533 #if CMK_MEM_CHECKPOINT
1534    _diePE = CmiMyNode();
1535    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1536    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1537    CkMemCheckPT::inRestarting = 1;
1538
1539   CpvAccess( _crashedNode )= CmiMyNode();
1540         
1541   _discard_charm_message();
1542     restartT = CmiWallTimer();
1543    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1544   
1545   /*if(CmiMyRank()==0){
1546     CkCallback cb(qd_callback);
1547     CkStartQD(cb);
1548     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1549   }*/
1550    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1551    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1552    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1553    CmiSetHandler(msg, askProcDataHandlerIdx);
1554    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1555    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1556 #else
1557    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1558 #endif
1559 }
1560
1561 // can be called in other files
1562 // return true if it is in restarting
1563 extern "C"
1564 int CkInRestarting()
1565 {
1566 #if CMK_MEM_CHECKPOINT
1567   if (CpvAccess( _crashedNode)!=-1) return 1;
1568   // gzheng
1569   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1570   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1571   return CkMemCheckPT::inRestarting;
1572 #else
1573   return 0;
1574 #endif
1575 }
1576
1577 extern "C"
1578 void CkSetInLdb(){
1579 #if CMK_MEM_CHECKPOINT
1580         CkMemCheckPT::inLoadbalancing = 1;
1581 #endif
1582 }
1583
1584 extern "C"
1585 int CkInLdb(){
1586 #if CMK_MEM_CHECKPOINT
1587         return CkMemCheckPT::inLoadbalancing;
1588 #endif
1589         return 0;
1590 }
1591
1592 extern "C"
1593 void CkResetInLdb(){
1594 #if CMK_MEM_CHECKPOINT
1595         CkMemCheckPT::inLoadbalancing = 0;
1596 #endif
1597 }
1598
1599 /*****************************************************************************
1600                 module initialization
1601 *****************************************************************************/
1602
1603 static int arg_where = CkCheckPoint_inMEM;
1604
1605 #if CMK_MEM_CHECKPOINT
1606 void init_memcheckpt(char **argv)
1607 {
1608     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1609       arg_where = CkCheckPoint_inDISK;
1610     }
1611
1612         // initiliazing _crashedNode variable
1613         CpvInitialize(int, _crashedNode);
1614         CpvAccess(_crashedNode) = -1;
1615
1616 }
1617 #endif
1618
1619 class CkMemCheckPTInit: public Chare {
1620 public:
1621   CkMemCheckPTInit(CkArgMsg *m) {
1622 #if CMK_MEM_CHECKPOINT
1623     if (arg_where == CkCheckPoint_inDISK) {
1624       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1625     }
1626     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1627     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1628 #endif
1629   }
1630 };
1631
1632 static void notifyHandler(char *msg)
1633 {
1634 #if CMK_MEM_CHECKPOINT
1635   CmiFree(msg);
1636       /* immediately increase restart phase to filter old messages */
1637   CpvAccess(_curRestartPhase) ++;
1638   CpvAccess(_qd)->flushStates();
1639   _discard_charm_message();
1640
1641 #endif
1642 }
1643
1644 extern "C"
1645 void notify_crash(int node)
1646 {
1647 #ifdef CMK_MEM_CHECKPOINT
1648   CpvAccess( _crashedNode) = node;
1649 #ifdef CMK_SMP
1650   for(int i=0;i<CkMyNodeSize();i++){
1651         CpvAccessOther(_crashedNode,i)=node;
1652   }
1653 #endif
1654   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1655   CkMemCheckPT::inRestarting = 1;
1656
1657     // this may be in interrupt handler, send a message to reset QD
1658   int pe = CmiNodeFirst(CkMyNode());
1659   for(int i=0;i<CkMyNodeSize();i++){
1660         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1661         CmiSetHandler(msg, notifyHandlerIdx);
1662         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1663   }
1664 #endif
1665 }
1666
1667 extern "C" void (*notify_crash_fn)(int node);
1668
1669 #if CMK_CONVERSE_MPI
1670 static int pingHandlerIdx;
1671 static int pingCheckHandlerIdx;
1672 static int buddyDieHandlerIdx;
1673 static double lastPingTime = -1;
1674
1675 extern "C" void mpi_restart_crashed(int pe, int rank);
1676 extern "C" int  find_spare_mpirank(int pe);
1677
1678 void pingBuddy();
1679 void pingCheckHandler();
1680
1681 void buddyDieHandler(char *msg)
1682 {
1683 #if CMK_MEM_CHECKPOINT
1684    // notify
1685    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1686    notify_crash(diepe);
1687    // send message to crash pe to let it restart
1688    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1689    int newrank = find_spare_mpirank(diepe);
1690    int buddy = obj->BuddyPE(CmiMyPe());
1691    if (buddy == diepe)  {
1692      mpi_restart_crashed(diepe, newrank);
1693      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1694    }
1695 #endif
1696 }
1697
1698 void pingHandler(void *msg)
1699 {
1700   lastPingTime = CmiWallTimer();
1701   CmiFree(msg);
1702 }
1703
1704 void pingCheckHandler()
1705 {
1706 #if CMK_MEM_CHECKPOINT
1707   double now = CmiWallTimer();
1708   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1709     int i, pe, buddy;
1710     // tell everyone the buddy dies
1711     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1712     for (i = 1; i < CmiNumPes(); i++) {
1713        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1714        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1715     }
1716     buddy = pe;
1717     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1718     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1719       if (obj->isFailed(pe) || pe == buddy) continue;
1720       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1721       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1722       CmiSetHandler(msg, buddyDieHandlerIdx);
1723       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1724     }*/
1725     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1726     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1727     CmiSetHandler(msg, buddyDieHandlerIdx);
1728     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1729   }
1730   else 
1731     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1732 #endif
1733 }
1734
1735 void pingBuddy()
1736 {
1737 #if CMK_MEM_CHECKPOINT
1738   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1739   if (obj) {
1740     int buddy = obj->BuddyPE(CkMyPe());
1741 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1742     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1743     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1744     CmiSetHandler(msg, pingHandlerIdx);
1745     CmiGetRestartPhase(msg) = 9999;
1746     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1747   }
1748   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1749 #endif
1750 }
1751 #endif
1752
1753 // initproc
1754 void CkRegisterRestartHandler( )
1755 {
1756 #if CMK_MEM_CHECKPOINT
1757   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1758   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1759   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1760   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1761   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1762   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1763   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
1764
1765 #if CMK_CONVERSE_MPI
1766   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1767   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1768   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1769 #endif
1770
1771   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1772   CpvInitialize(CkCheckPTMessage **, chkpBuf);
1773   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
1774   CpvInitialize(CkCheckPTMessage *, buddyBuf);
1775   CpvInitialize(int,curPointer);
1776   CpvAccess(procChkptBuf) = NULL;
1777   CpvAccess(buddyBuf) = NULL;
1778   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
1779   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
1780   CpvAccess(chkpBuf)[0] = NULL;
1781   CpvAccess(chkpBuf)[1] = NULL;
1782   CpvAccess(localProcChkpBuf)[0] = NULL;
1783   CpvAccess(localProcChkpBuf)[1] = NULL;
1784   CpvAccess(curPointer) = 0;
1785   CpvAccess(recvdLocal) = 0;
1786   CpvAccess(recvdRemote) = 0;
1787
1788   notify_crash_fn = notify_crash;
1789
1790 #if ! CMK_CONVERSE_MPI
1791   // print pid to kill
1792 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1793 //  sleep(4);
1794 #endif
1795 #endif
1796 }
1797
1798
1799 extern "C"
1800 int CkHasCheckpoints()
1801 {
1802   return checkpointed;
1803 }
1804
1805 /// @todo: the following definitions should be moved to a separate file containing
1806 // structures and functions about fault tolerance strategies
1807
1808 /**
1809  *  * @brief: function for killing a process                                             
1810  *   */
1811 #ifdef CMK_MEM_CHECKPOINT
1812 #if CMK_HAS_GETPID
1813 void killLocal(void *_dummy,double curWallTime){
1814         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1815         if(CmiWallTimer()<killTime-1){
1816                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1817         }else{ 
1818 #if CMK_CONVERSE_MPI
1819                                 CkDieNow();
1820 #else 
1821                 kill(getpid(),SIGKILL);                                               
1822 #endif
1823         }              
1824
1825 #else
1826 void killLocal(void *_dummy,double curWallTime){
1827   CmiAbort("kill() not supported!");
1828 }
1829 #endif
1830 #endif
1831
1832 #ifdef CMK_MEM_CHECKPOINT
1833 /**
1834  * @brief: reads the file with the kill information
1835  */
1836 void readKillFile(){
1837         FILE *fp=fopen(killFile,"r");
1838         if(!fp){
1839                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1840                 return;
1841         }
1842         int proc;
1843         double sec;
1844         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1845                 if(proc == CkMyNode() && CkMyRank() == 0){
1846                         killTime = CmiWallTimer()+sec;
1847                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1848                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1849                 }
1850         }
1851         fclose(fp);
1852 }
1853
1854 #if ! CMK_CONVERSE_MPI
1855 void CkDieNow()
1856 {
1857 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1858          // ignored for non-mpi version
1859         CmiPrintf("[%d] die now.\n", CmiMyPe());
1860         killTime = CmiWallTimer()+0.001;
1861         CcdCallFnAfter(killLocal,NULL,1);
1862 #endif
1863 }
1864 #endif
1865
1866 #endif
1867
1868 #include "CkMemCheckpoint.def.h"
1869
1870