fa77b32188be7b642f1ee52f0bc04ade9638c6fd
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, _remoteCrashedNode);
77
78 // static, so that it is accessible from Converse part
79 int CkMemCheckPT::inRestarting = 0;
80 int CkMemCheckPT::inCheckpointing = 0;
81 int CkMemCheckPT::replicaAlive = 1;
82 int CkMemCheckPT::inLoadbalancing = 0;
83 double CkMemCheckPT::startTime;
84 char *CkMemCheckPT::stage;
85 CkCallback CkMemCheckPT::cpCallback;
86
87 int _memChkptOn = 1;                    // checkpoint is on or off
88
89 CkGroupID ckCheckPTGroupID;             // readonly
90
91 static int checkpointed = 0;
92
93 /// @todo the following declarations should be moved into a separate file for all 
94 // fault tolerant strategies
95
96 #ifdef CMK_MEM_CHECKPOINT
97 // name of the kill file that contains processes to be killed 
98 char *killFile;                                               
99 // flag for the kill file         
100 int killFlag=0;
101 // variable for storing the killing time
102 double killTime=0.0;
103 #endif
104
105 #ifdef CKLOCMGR_LOOP
106 #undef CKLOCMGR_LOOP
107 #endif
108 // loop over all CkLocMgr and do "code"
109 #define  CKLOCMGR_LOOP(code)    {       \
110   int numGroups = CkpvAccess(_groupIDTable)->size();    \
111   for(int i=0;i<numGroups;i++) {        \
112     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
113     if(obj->isLocMgr())  {      \
114       CkLocMgr *mgr = (CkLocMgr*)obj;   \
115       code      \
116     }   \
117   }     \
118 }
119
120 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
121 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
122 CpvDeclare(CkCheckPTMessage**, chkpBuf);
123 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
124 //store the checkpoint of the buddy to compare
125 //do not need the whole msg, can be the checksum
126 CpvDeclare(CkCheckPTMessage*, buddyBuf);
127 //pointer of the checkpoint going to be written
128 CpvDeclare(int, curPointer);
129 CpvDeclare(int, recvdRemote);
130 CpvDeclare(int, recvdLocal);
131 CpvDeclare(int, localChkpDone);
132 CpvDeclare(int, remoteChkpDone);
133 CpvDeclare(int, recvdArrayChkp);
134 CpvDeclare(int, recvdProcChkp);
135 CpvDeclare(int, localChecksum);
136 CpvDeclare(int, remoteChecksum);
137
138 bool compare(char * buf1, char * buf2);
139 int getChecksum(char * buf);
140 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
141 // Converse function handles
142 static int askPhaseHandlerIdx;
143 static int recvPhaseHandlerIdx;
144 static int askProcDataHandlerIdx;
145 static int restartBcastHandlerIdx;
146 static int recoverProcDataHandlerIdx;
147 static int restartBeginHandlerIdx;
148 static int recvRemoteChkpHandlerIdx;
149 static int replicaDieHandlerIdx;
150 static int replicaDieBcastHandlerIdx;
151 static int replicaRecoverHandlerIdx;
152 static int replicaChkpDoneHandlerIdx;
153 static int recoverRemoteProcDataHandlerIdx;
154 static int recoverRemoteArrayDataHandlerIdx;
155 static int notifyHandlerIdx;
156 // compute the backup processor
157 // FIXME: avoid crashed processors
158 #if CMK_CONVERSE_MPI
159 static int pingHandlerIdx;
160 static int pingCheckHandlerIdx;
161 static int buddyDieHandlerIdx;
162 static double lastPingTime = -1;
163
164 extern "C" void mpi_restart_crashed(int pe, int rank);
165 extern "C" int  find_spare_mpirank(int pe,int partition);
166
167 void pingBuddy();
168 void pingCheckHandler();
169
170 #endif
171 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
172
173 inline int CkMemCheckPT::BuddyPE(int pe)
174 {
175   int budpe;
176 #if NODE_CHECKPOINT
177   // buddy is the processor with same rank on the next physical node
178   int r1 = CmiPhysicalRank(pe);
179   int budnode = CmiPhysicalNodeID(pe);
180   do {
181     budnode = (budnode+1)%CmiNumPhysicalNodes();
182     int *pelist;
183     int num;
184     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
185     budpe = pelist[r1 % num];
186   } while (isFailed(budpe));
187   if (budpe == pe) {
188     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
189     CmiAbort("Failed to find a buddy processor");
190   }
191 #else
192   budpe = pe;
193   budpe = (budpe+1)%CkNumPes();
194 #endif
195   return budpe;
196 }
197
198 // called in array element constructor
199 // choose and register with 2 buddies for checkpoiting 
200 #if CMK_MEM_CHECKPOINT
201 void ArrayElement::init_checkpt() {
202   if (_memChkptOn == 0) return;
203   if (CkInRestarting()) {
204     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
205   }
206   // only master init checkpoint
207   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
208
209   budPEs[0] = CkMyPe();
210   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
211   CmiAssert(budPEs[0] != budPEs[1]);
212   // inform checkPTMgr
213   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
214   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
215   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
216   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
217 }
218 #endif
219
220 // entry function invoked by checkpoint mgr asking for checkpoint data
221 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
222 #if CMK_MEM_CHECKPOINT
223   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
224   //char index[128];   thisIndexMax.sprint(index);
225   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
226   CkLocMgr *locMgr = thisArray->getLocMgr();
227   CmiAssert(myRec!=NULL);
228   int size;
229   {
230     PUP::sizer p;
231     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
232     size = p.size();
233   }
234   int packSize = size/sizeof(double) +1;
235   CkArrayCheckPTMessage *msg =
236     new (packSize, 0) CkArrayCheckPTMessage;
237   msg->len = size;
238   msg->index =thisIndexMax;
239   msg->aid = thisArrayID;
240   msg->locMgr = locMgr->getGroupID();
241   msg->cp_flag = 1;
242   {
243     PUP::toMem p(msg->packData);
244     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
245   }
246
247   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
248   checkptMgr.recvData(msg, 2, budPEs);
249   delete m;
250 #endif
251 }
252
253 // checkpoint holder class - for memory checkpointing
254 class CkMemCheckPTInfo: public CkCheckPTInfo
255 {
256   CkArrayCheckPTMessage *ckBuffer;
257   public:
258   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
259     CkCheckPTInfo(a, loc, idx, pno)
260   {
261     ckBuffer = NULL;
262   }
263   ~CkMemCheckPTInfo() 
264   {
265     if (ckBuffer) delete ckBuffer; 
266   }
267   inline void updateBuffer(CkArrayCheckPTMessage *data) 
268   {
269     CmiAssert(data!=NULL);
270     if (ckBuffer) delete ckBuffer;
271     ckBuffer = data;
272   }    
273   inline CkArrayCheckPTMessage * getCopy()
274   {
275     if (ckBuffer == NULL) {
276       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
277       CmiAbort("Abort!");
278     }
279     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
280   }     
281   inline void updateBuddy(int b1, int b2) {
282     CmiAssert(ckBuffer);
283     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
284     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
285     CmiAssert(pNo != CkMyPe());
286   }
287   inline int getSize() { 
288     CmiAssert(ckBuffer);
289     return ckBuffer->len; 
290   }
291 };
292
293 // checkpoint holder class - for in-disk checkpointing
294 class CkDiskCheckPTInfo: public CkCheckPTInfo 
295 {
296   char *fname;
297   int bud1, bud2;
298   int len;                      // checkpoint size
299   public:
300   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
301   {
302 #if CMK_USE_MKSTEMP
303     fname = new char[64];
304     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
305     mkstemp(fname);
306 #else
307     fname=tmpnam(NULL);
308 #endif
309     bud1 = bud2 = -1;
310     len = 0;
311   }
312   ~CkDiskCheckPTInfo() 
313   {
314     remove(fname);
315   }
316   inline void updateBuffer(CkArrayCheckPTMessage *data) 
317   {
318     double t = CmiWallTimer();
319     // unpack it
320     envelope *env = UsrToEnv(data);
321     CkUnpackMessage(&env);
322     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
323     FILE *f = fopen(fname,"wb");
324     PUP::toDisk p(f);
325     CkPupMessage(p, (void **)&data);
326     // delay sync to the end because otherwise the messages are blocked
327     //    fsync(fileno(f));
328     fclose(f);
329     bud1 = data->bud1;
330     bud2 = data->bud2;
331     len = data->len;
332     delete data;
333     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
334   }
335   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
336   {
337     CkArrayCheckPTMessage *data;
338     FILE *f = fopen(fname,"rb");
339     PUP::fromDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     fclose(f);
342     data->bud1 = bud1;                          // update the buddies
343     data->bud2 = bud2;
344     return data;
345   }
346   inline void updateBuddy(int b1, int b2) {
347     bud1 = b1; bud2 = b2;
348     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
349     CmiAssert(pNo != CkMyPe());
350   }
351   inline int getSize() { 
352     return len; 
353   }
354 };
355
356 CkMemCheckPT::CkMemCheckPT(int w)
357 {
358   int numnodes = 0;
359 #if NODE_CHECKPOINT
360   numnodes = CmiNumPhysicalNodes();
361 #else
362   numnodes = CkNumPes();
363 #endif
364 #if CK_NO_PROC_POOL
365   if (numnodes <= 2)
366 #else
367     if (numnodes  == 1)
368 #endif
369     {
370       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
371       _memChkptOn = 0;
372     }
373   inRestarting = 0;
374   inCheckpointing = 0;
375   recvCount = peCount = 0;
376   ackCount = 0;
377   expectCount = -1;
378   where = w;
379   replicaAlive = 1;
380   notifyReplica = 0;
381 #if CMK_CONVERSE_MPI
382   void pingBuddy();
383   void pingCheckHandler();
384   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
385   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
386 #endif
387   chkpTable[0] = NULL;
388   chkpTable[1] = NULL;
389   maxIter = -1;
390   recvIterCount = 0;
391   localDecided = false;
392 }
393
394 CkMemCheckPT::~CkMemCheckPT()
395 {
396   int len = ckTable.length();
397   for (int i=0; i<len; i++) {
398     delete ckTable[i];
399   }
400 }
401
402 void CkMemCheckPT::pup(PUP::er& p) 
403
404   CBase_CkMemCheckPT::pup(p); 
405   p|cpStarter;
406   p|thisFailedPe;
407   p|failedPes;
408   p|ckCheckPTGroupID;           // recover global variable
409   p|cpCallback;                 // store callback
410   p|where;                      // where to checkpoint
411   p|peCount;
412   if (p.isUnpacking()) {
413     recvCount = 0;
414 #if CMK_CONVERSE_MPI
415     void pingBuddy();
416     void pingCheckHandler();
417     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
418     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
419 #endif
420     maxIter = -1;
421     recvIterCount = 0;
422     localDecided = false;
423   }
424 }
425
426 void CkMemCheckPT::getIter(){
427   localDecided = true;
428   localMaxIter = maxIter+1;
429   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
430   int elemCount = CkCountChkpSyncElements();
431   if(elemCount == 0){
432     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
433   }
434 }
435
436 //when one replica failes, decide the next chkp iter
437
438 void CkMemCheckPT::recvIter(int iter){
439   if(maxIter<iter){
440     maxIter=iter;
441   }
442 }
443
444 void CkMemCheckPT::recvMaxIter(int iter){
445   localDecided = false;
446   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
447 }
448
449 void CkMemCheckPT::reachChkpIter(){
450   recvIterCount++;
451   elemCount = CkCountChkpSyncElements();
452   if(recvIterCount == elemCount){
453     recvIterCount = 0;
454     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
455   }
456 }
457
458 void CkMemCheckPT::startChkp(){
459   CkPrintf("start checkpoint\n");
460   CkStartMemCheckpoint(cpCallback);
461 }
462
463 // called by checkpoint mgr to restore an array element
464 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
465 {
466 #if CMK_MEM_CHECKPOINT
467   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
468   // m->index.print();
469   PUP::fromMem p(m->packData);
470   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
471   CmiAssert(mgr);
472 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
473   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
474 #else
475   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
476 #endif
477
478   // find a list of array elements bound together
479   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
480   CmiAssert(elt);
481   CkLocRec_local *rec = elt->myRec;
482   CkVec<CkMigratable *> list;
483   mgr->migratableList(rec, list);
484   CmiAssert(list.length() > 0);
485   for (int l=0; l<list.length(); l++) {
486     elt = (ArrayElement *)list[l];
487     elt->budPEs[0] = m->bud1;
488     elt->budPEs[1] = m->bud2;
489     //    reset, may not needed now
490     // for now.
491     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
492       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
493       if (c) c->redNo = 0;
494     }
495   }
496 #endif
497 }
498
499 // return 1 if pe is a crashed processor
500 int CkMemCheckPT::isFailed(int pe)
501 {
502   for (int i=0; i<failedPes.length(); i++)
503     if (failedPes[i] == pe) return 1;
504   return 0;
505 }
506
507 // add pe into history list of all failed processors
508 void CkMemCheckPT::failed(int pe)
509 {
510   if (isFailed(pe)) return;
511   failedPes.push_back(pe);
512 }
513
514 int CkMemCheckPT::totalFailed()
515 {
516   return failedPes.length();
517 }
518
519 // create an checkpoint entry for array element of aid with index.
520 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
521 {
522   // error check, no duplicate
523   int idx, len = ckTable.size();
524   for (idx=0; idx<len; idx++) {
525     CkCheckPTInfo *entry = ckTable[idx];
526     if (index == entry->index) {
527       if (loc == entry->locMgr) {
528         // bindTo array elements
529         return;
530       }
531       // for array inheritance, the following check may fail
532       // because ArrayElement constructor of all superclasses are called
533       if (aid == entry->aid) {
534         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
535         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
536       }
537     }
538   }
539   CkCheckPTInfo *newEntry;
540   if (where == CkCheckPoint_inMEM)
541     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
542   else
543     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
544   ckTable.push_back(newEntry);
545   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
546 }
547
548 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
549 {
550 #if !CMK_CHKP_ALL       
551   int buddy = msg->bud1;
552   if (buddy == CkMyPe()) buddy = msg->bud2;
553   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
554   recvData(msg);
555   // ack
556   thisProxy[buddy].gotData();
557 #else
558   chkpTable[0] = NULL;
559   chkpTable[1] = NULL;
560   recvArrayCheckpoint(msg);
561   thisProxy[msg->bud2].gotData();
562 #endif
563 }
564
565 // loop through my checkpoint table and ask checkpointed array elements
566 // to send me checkpoint data.
567 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
568 {
569   checkpointed = 1;
570   cpCallback = cb;
571   cpStarter = starter;
572   inCheckpointing = 1;
573   if (CkMyPe() == cpStarter) {
574     startTime = CmiWallTimer();
575     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
576   }
577 #if CMK_CONVERSE_MPI
578   if(CmiNumPartition()==1)
579 #endif    
580   {
581
582 #if !CMK_CHKP_ALL
583     int len = ckTable.length();
584     for (int i=0; i<len; i++) {
585       CkCheckPTInfo *entry = ckTable[i];
586       // always let the bigger number processor send request
587       //if (CkMyPe() < entry->pNo) continue;
588       // always let the smaller number processor send request, may on same proc
589       if (!isMaster(entry->pNo)) continue;
590       // call inmem_checkpoint to the array element, ask it to send
591       // back checkpoint data via recvData().
592       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
593       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
594     }
595     // if my table is empty, then I am done
596     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
597 #else
598     startArrayCheckpoint();
599 #endif
600     sendProcData();
601   }
602 #if CMK_CONVERSE_MPI
603   else
604   {
605     startCheckpoint();
606   }
607 #endif
608   // pack and send proc level data
609 }
610
611 class MemElementPacker : public CkLocIterator{
612   private:
613     //    CkLocMgr *locMgr;
614     PUP::er &p;
615     std::map<CkHashCode,CkLocation> arrayMap;
616   public:
617     MemElementPacker(PUP::er &p_):p(p_){};
618     void addLocation(CkLocation &loc){
619       CkArrayIndexMax idx = loc.getIndex();
620       arrayMap[idx.hash()] = loc;
621     }
622     void writeCheckpoint(){
623       std::map<CkHashCode, CkLocation>::iterator it;
624       for(it = arrayMap.begin();it!=arrayMap.end();it++){
625         CkLocation loc = it->second;
626         CkLocMgr *locMgr = loc.getManager();
627         CkArrayIndexMax idx = loc.getIndex();
628         CkGroupID gID = locMgr->ckGetGroupID();
629         p|gID;
630         p|idx;
631         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
632       }
633     }
634 };
635
636 void pupAllElements(PUP::er &p){
637 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
638   int numElements;
639   if(!p.isUnpacking()){
640     numElements = CkCountArrayElements();
641   }
642   p | numElements;
643   if(!p.isUnpacking()){
644     MemElementPacker packer(p);
645     CKLOCMGR_LOOP(mgr->iterate(packer););
646     packer.writeCheckpoint();
647   }
648 #endif
649 }
650
651 void CkMemCheckPT::startArrayCheckpoint(){
652 #if CMK_CHKP_ALL
653   int size;
654   {
655     PUP::sizer psizer;
656     pupAllElements(psizer);
657     size = psizer.size();
658   }
659   int packSize = size/sizeof(double)+1;
660   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
661   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
662   msg->len = size;
663   msg->cp_flag = 1;
664   int budPEs[2];
665   msg->bud1=CkMyPe();
666   msg->bud2=ChkptOnPe(CkMyPe());
667   {
668     PUP::toMem p(msg->packData);
669     pupAllElements(p);
670   }
671   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
672   if(chkpTable[0]) delete chkpTable[0];
673   chkpTable[0] = msg;
674   //send the checkpoint to my 
675   recvCount++;
676 #endif
677 }
678
679 void CkMemCheckPT::startCheckpoint(){
680 #if CMK_CONVERSE_MPI
681   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
682     CkPrintf("in start checkpointing!!!!\n");
683   int size;
684   {
685     PUP::sizer p;
686     _handleProcData(p,CmiFalse);
687     size = p.size();
688   }
689   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
690   procMsg->len = size;
691   procMsg->cp_flag = 1;
692   {
693     PUP::toMem p(procMsg->packData);
694     _handleProcData(p,CmiFalse);
695   }
696   int pointer = CpvAccess(curPointer);
697   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
698   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
699
700   {
701     PUP::sizer psizer;
702     pupAllElements(psizer);
703     size = psizer.size();
704   }
705   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
706   msg->len = size;
707   msg->cp_flag = 1;
708   int checksum;
709   {
710 #if CMK_USE_CHECKSUM
711     PUP::checker p(msg->packData);
712     pupAllElements(p);
713     checksum = p.getChecksum();
714 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
715 #else 
716     PUP::toMem p(msg->packData);
717     pupAllElements(p);
718 #endif
719   }
720   pointer = CpvAccess(curPointer);
721   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
722   CpvAccess(chkpBuf)[pointer] = msg;
723   if(CkMyPe()==0)
724     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
725   if(CkReplicaAlive()==1){
726     CpvAccess(recvdLocal) = 1;
727 #if CMK_USE_CHECKSUM
728 //    CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
729 //    CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
730 //    delete tmpMsg;
731     CpvAccess(localChecksum) = checksum;
732     char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
733     *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
734     CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
735     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
736 #else
737     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
738     CkPackMessage(&env);
739     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
740     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
741 #endif
742   }
743   if(CpvAccess(recvdRemote)==1){
744     //compare the checkpoint 
745     int size = CpvAccess(chkpBuf)[pointer]->len;
746 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
747 #if CMK_USE_CHECKSUM
748     if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
749       thisProxy[CkMyPe()].doneComparison(true);
750     }
751 #else
752     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
753       thisProxy[CkMyPe()].doneComparison(true);
754     }
755 #endif
756     else{
757       //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
758       thisProxy[CkMyPe()].doneComparison(false);
759     }
760     if(CkMyPe()==0)
761       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
762   }
763   else{
764     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
765       { 
766         int pointer = CpvAccess(curPointer);
767         //send the proc data
768         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
769         procMsg->pointer = pointer;
770         envelope * env = (envelope *)(UsrToEnv(procMsg));
771         CkPackMessage(&env);
772         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
773         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
774         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
775           CkPrintf("[%d] sendProcdata\n",CkMyPe());
776         }
777       }
778       //send the array checkpoint data
779       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
780       msg->pointer = CpvAccess(curPointer);
781       envelope * env = (envelope *)(UsrToEnv(msg));
782       CkPackMessage(&env);
783       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
784       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
785       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
786         CkPrintf("[%d] sendArraydata\n",CkMyPe());
787       //can continue work, no need to wait for my replica
788     }
789   }
790 #endif
791 }
792
793 void CkMemCheckPT::doneComparison(bool ret){
794   int _ret = 1;
795   if(!ret){
796     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
797     _ret = 0;
798   }else{
799     _ret = 1;
800   }
801   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
802   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
803 }
804
805 void CkMemCheckPT::doneRComparison(int ret){
806   //    if(CpvAccess(curPointer) == 0){
807   if(ret==CkNumPes()){
808     CpvAccess(localChkpDone) = 1;
809     if(CpvAccess(remoteChkpDone) ==1){
810       thisProxy.doneBothComparison();
811     }
812     if(notifyReplica == 0){
813       //notify the replica am done
814       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
815       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
816       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
817       notifyReplica = 1;
818     }
819   }
820   else{
821     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
822     startTime = CmiWallTimer();
823     thisProxy.RollBack();
824   }
825 }
826
827 void CkMemCheckPT::doneBothComparison(){
828   CpvAccess(recvdRemote) = 0;
829   CpvAccess(recvdLocal) = 0;
830   CpvAccess(localChkpDone) = 0;
831   CpvAccess(remoteChkpDone) = 0;
832   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
833   CpvAccess(curPointer)^=1;
834   inCheckpointing = 0;
835   notifyReplica = 0;
836   if(CkMyPe() == 0){
837     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,size);
838   }
839   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
840 }
841
842 void CkMemCheckPT::RollBack(){
843   //restore group data
844   checkpointed = 0;
845   CkMemCheckPT::inRestarting = 1;
846   int pointer = CpvAccess(curPointer)^1;//use the previous one
847   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
848   PUP::fromMem p(chkpMsg->packData);    
849
850   //destroy array elements
851   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
852   int numGroups = CkpvAccess(_groupIDTable)->size();
853   for(int i=0;i<numGroups;i++) {
854     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
855     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
856     obj->flushStates();
857     obj->ckJustMigrated();
858   }
859   //restore array elements
860
861   int numElements;
862   p|numElements;
863
864   if(p.isUnpacking()){
865     for(int i=0;i<numElements;i++){
866       CkGroupID gID;
867       CkArrayIndex idx;
868       p|gID;
869       p|idx;
870       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
871       CmiAssert(mgr);
872       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
873     }
874     }
875     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
876     contribute(cb);
877   }
878
879   void CkMemCheckPT::notifyReplicaDie(int pe){
880     replicaAlive = 0;
881     CpvAccess(_remoteCrashedNode) = pe;
882   }
883
884   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
885   {
886 #if CMK_CHKP_ALL
887     int idx = 1;
888     if(msg->bud1 == CkMyPe()){
889       idx = 0;
890     }
891     int isChkpting = msg->cp_flag;
892     if(isChkpting == 1){
893       if(chkpTable[idx]) delete chkpTable[idx];
894     }
895     chkpTable[idx] = msg;
896     if(isChkpting){
897       recvCount++;
898       if(recvCount == 2){
899         if (where == CkCheckPoint_inMEM) {
900           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
901         }
902         else if (where == CkCheckPoint_inDISK) {
903           // another barrier for finalize the writing using fsync
904           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
905           contribute(0,NULL,CkReduction::sum_int,localcb);
906         }
907         else
908           CmiAbort("Unknown checkpoint scheme");
909         recvCount = 0;
910       }
911     }
912 #endif
913   }
914
915   // don't handle array elements
916   static inline void _handleProcData(PUP::er &p, CmiBool create)
917   {
918     // save readonlys, and callback BTW
919     CkPupROData(p);
920
921     // save mainchares 
922     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
923
924 #ifndef CMK_CHARE_USE_PTR
925     // save non-migratable chare
926     CkPupChareData(p);
927 #endif
928
929     // save groups into Groups.dat
930     CkPupGroupData(p,create);
931
932     // save nodegroups into NodeGroups.dat
933     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
934   }
935
936   void CkMemCheckPT::sendProcData()
937   {
938     // find out size of buffer
939     int size;
940     {
941       PUP::sizer p;
942       _handleProcData(p,CmiTrue);
943       size = p.size();
944     }
945     int packSize = size;
946     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
947     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
948     {
949       PUP::toMem p(msg->packData);
950       _handleProcData(p,CmiTrue);
951     }
952     msg->pe = CkMyPe();
953     msg->len = size;
954     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
955     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
956   }
957
958   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
959   {
960     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
961     CpvAccess(procChkptBuf) = msg;
962     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
963     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
964   }
965
966   // ArrayElement call this function to give us the checkpointed data
967   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
968   {
969     int len = ckTable.length();
970     int idx;
971     for (idx=0; idx<len; idx++) {
972       CkCheckPTInfo *entry = ckTable[idx];
973       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
974     }
975     CkAssert(idx < len);
976     int isChkpting = msg->cp_flag;
977     ckTable[idx]->updateBuffer(msg);
978     if (isChkpting) {
979       // all my array elements have returned their inmem data
980       // inform starter processor that I am done.
981       recvCount ++;
982       if (recvCount == ckTable.length()) {
983         if (where == CkCheckPoint_inMEM) {
984           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
985         }
986         else if (where == CkCheckPoint_inDISK) {
987           // another barrier for finalize the writing using fsync
988           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
989           contribute(0,NULL,CkReduction::sum_int,localcb);
990         }
991         else
992           CmiAbort("Unknown checkpoint scheme");
993         recvCount = 0;
994       } 
995     }
996   }
997
998   // only used in disk checkpointing
999   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1000   {
1001     delete m;
1002 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1003     system("sync");
1004 #endif
1005     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1006   }
1007
1008   // only is called on cpStarter when checkpoint is done
1009   void CkMemCheckPT::cpFinish()
1010   {
1011     CmiAssert(CkMyPe() == cpStarter);
1012     peCount++;
1013     // now that all processors have finished, activate callback
1014     if (peCount == 2) 
1015     {
1016       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1017       peCount = 0;
1018       thisProxy.report();
1019     }
1020   }
1021
1022   // for debugging, report checkpoint info
1023   void CkMemCheckPT::report()
1024   {
1025     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1026     inCheckpointing = 0;
1027 #if !CMK_CHKP_ALL
1028     int objsize = 0;
1029     int len = ckTable.length();
1030     for (int i=0; i<len; i++) {
1031       CkCheckPTInfo *entry = ckTable[i];
1032       CmiAssert(entry);
1033       objsize += entry->getSize();
1034     }
1035     CmiAssert(CpvAccess(procChkptBuf));
1036     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1037 #else
1038     if(CkMyPe()==0)
1039       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1040 #endif
1041   }
1042
1043   /*****************************************************************************
1044     RESTART Procedure
1045    *****************************************************************************/
1046
1047   // master processor of two buddies
1048   inline int CkMemCheckPT::isMaster(int buddype)
1049   {
1050 #if 0
1051     int mype = CkMyPe();
1052     //CkPrintf("ismaster: %d %d\n", pe, mype);
1053     if (CkNumPes() - totalFailed() == 2) {
1054       return mype > buddype;
1055     }
1056     for (int i=1; i<CkNumPes(); i++) {
1057       int me = (buddype+i)%CkNumPes();
1058       if (isFailed(me)) continue;
1059       if (me == mype) return 1;
1060       else return 0;
1061     }
1062     return 0;
1063 #else
1064     // smaller one
1065     int mype = CkMyPe();
1066     //CkPrintf("ismaster: %d %d\n", pe, mype);
1067     if (CkNumPes() - totalFailed() == 2) {
1068       return mype < buddype;
1069     }
1070 #if NODE_CHECKPOINT
1071     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1072     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1073 #else
1074       for (int i=1; i<CkNumPes(); i++) {
1075 #endif
1076         int me = (mype+i)%CkNumPes();
1077         if (isFailed(me)) continue;
1078         if (me == buddype) return 1;
1079         else return 0;
1080       }
1081       return 0;
1082 #endif
1083     }
1084
1085
1086
1087 #if 0
1088     // helper class to pup all elements that belong to same ckLocMgr
1089     class ElementDestoryer : public CkLocIterator {
1090       private:
1091         CkLocMgr *locMgr;
1092       public:
1093         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1094         void addLocation(CkLocation &loc) {
1095           CkArrayIndex idx=loc.getIndex();
1096           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1097           loc.destroy();
1098         }
1099     };
1100 #endif
1101
1102     // restore the bitmap vector for LB
1103     void CkMemCheckPT::resetLB(int diepe)
1104     {
1105 #if CMK_LBDB_ON
1106       int i;
1107       char *bitmap = new char[CkNumPes()];
1108       // set processor available bitmap
1109       get_avail_vector(bitmap);
1110       for (i=0; i<failedPes.length(); i++)
1111         bitmap[failedPes[i]] = 0; 
1112       bitmap[diepe] = 0;
1113
1114 #if CK_NO_PROC_POOL
1115       set_avail_vector(bitmap);
1116 #endif
1117
1118       // if I am the crashed pe, rebuild my failedPEs array
1119       if (CkMyNode() == diepe)
1120         for (i=0; i<CkNumPes(); i++) 
1121           if (bitmap[i]==0) failed(i);
1122
1123       delete [] bitmap;
1124 #endif
1125     }
1126
1127     static double restartT;
1128
1129     // in case when failedPe dies, everybody go through its checkpoint table:
1130     // destory all array elements
1131     // recover lost buddies
1132     // reconstruct all array elements from check point data
1133     // called on all processors
1134     void CkMemCheckPT::restart(int diePe)
1135     {
1136 #if CMK_MEM_CHECKPOINT
1137       double curTime = CmiWallTimer();
1138       if (CkMyPe() == diePe){
1139         restartT = CmiWallTimer();
1140         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1141       }
1142       stage = (char*)"resetLB";
1143       startTime = curTime;
1144       if (CkMyPe() == diePe)
1145         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1146
1147 #if CK_NO_PROC_POOL
1148       failed(diePe);    // add into the list of failed pes
1149 #endif
1150       thisFailedPe = diePe;
1151
1152       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1153
1154       inRestarting = 1;
1155
1156       // disable load balancer's barrier
1157       //if (CkMyPe() != diePe) resetLB(diePe);
1158
1159       CKLOCMGR_LOOP(mgr->startInserting(););
1160
1161
1162       if(CmiNumPartition()==1){
1163         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1164       }else{
1165         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1166         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1167       }
1168 #endif
1169     }
1170
1171     // loally remove all array elements
1172     void CkMemCheckPT::removeArrayElements()
1173     {
1174 #if CMK_MEM_CHECKPOINT
1175       int len = ckTable.length();
1176       double curTime = CmiWallTimer();
1177       if (CkMyPe() == thisFailedPe) 
1178         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1179       stage = (char*)"removeArrayElements";
1180       startTime = curTime;
1181
1182       //  if (cpCallback.isInvalid()) 
1183       //          CkPrintf("invalid pe %d\n",CkMyPe());
1184       //          CkAbort("Didn't set restart callback\n");;
1185       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1186
1187       // get rid of all buffering and remote recs
1188       // including destorying all array elements
1189 #if CK_NO_PROC_POOL  
1190       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1191 #else
1192       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1193 #endif
1194       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1195 #endif
1196     }
1197
1198     // flush state in reduction manager
1199     void CkMemCheckPT::resetReductionMgr()
1200     {
1201       if (CkMyPe() == thisFailedPe) 
1202         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1203       int numGroups = CkpvAccess(_groupIDTable)->size();
1204       for(int i=0;i<numGroups;i++) {
1205         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1206         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1207         obj->flushStates();
1208         obj->ckJustMigrated();
1209       }
1210       // reset again
1211       //CpvAccess(_qd)->flushStates();
1212       if(CmiNumPartition()==1){
1213         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1214       }
1215       else
1216         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1217     }
1218
1219     // recover the lost buddies
1220     void CkMemCheckPT::recoverBuddies()
1221     {
1222       int idx;
1223       int len = ckTable.length();
1224       // ready to flush reduction manager
1225       // cannot be CkMemCheckPT::restart because destory will modify states
1226       double curTime = CmiWallTimer();
1227       if (CkMyPe() == thisFailedPe)
1228         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1229       stage = (char *)"recoverBuddies";
1230       if (CkMyPe() == thisFailedPe)
1231         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1232       startTime = curTime;
1233
1234       // recover buddies
1235       expectCount = 0;
1236 #if !CMK_CHKP_ALL
1237       for (idx=0; idx<len; idx++) {
1238         CkCheckPTInfo *entry = ckTable[idx];
1239         if (entry->pNo == thisFailedPe) {
1240 #if CK_NO_PROC_POOL
1241           // find a new buddy
1242           int budPe = BuddyPE(CkMyPe());
1243 #else
1244           int budPe = thisFailedPe;
1245 #endif
1246           CkArrayCheckPTMessage *msg = entry->getCopy();
1247           msg->bud1 = budPe;
1248           msg->bud2 = CkMyPe();
1249           msg->cp_flag = 0;            // not checkpointing
1250           thisProxy[budPe].recoverEntry(msg);
1251           expectCount ++;
1252         }
1253       }
1254 #else
1255       //send to failed pe
1256       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1257 #if CK_NO_PROC_POOL
1258         // find a new buddy
1259         int budPe = BuddyPE(CkMyPe());
1260 #else
1261         int budPe = thisFailedPe;
1262 #endif
1263         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1264         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1265         msg->cp_flag = 0;            // not checkpointing
1266         msg->bud1 = budPe;
1267         msg->bud2 = CkMyPe();
1268         thisProxy[budPe].recoverEntry(msg);
1269         expectCount ++;
1270       }
1271 #endif
1272
1273       if (expectCount == 0) {
1274         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1275       }
1276       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1277     }
1278
1279     void CkMemCheckPT::gotData()
1280     {
1281       ackCount ++;
1282       if (ackCount == expectCount) {
1283         ackCount = 0;
1284         expectCount = -1;
1285         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1286         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1287       }
1288     }
1289
1290     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1291     {
1292
1293       for (int i=0; i<n; i++) {
1294         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1295         mgr->updateLocation(idx[i], nowOnPe);
1296       }
1297       thisProxy[nowOnPe].gotReply();
1298     }
1299
1300     // restore array elements
1301     void CkMemCheckPT::recoverArrayElements()
1302     {
1303       double curTime = CmiWallTimer();
1304       int len = ckTable.length();
1305       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1306       stage = (char *)"recoverArrayElements";
1307       if (CkMyPe() == thisFailedPe)
1308         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1309       startTime = curTime;
1310       int flag = 0;
1311       // recover all array elements
1312       int count = 0;
1313
1314 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1315       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1316       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1317 #endif
1318
1319 #if !CMK_CHKP_ALL
1320       for (int idx=0; idx<len; idx++)
1321       {
1322         CkCheckPTInfo *entry = ckTable[idx];
1323 #if CK_NO_PROC_POOL
1324         // the bigger one will do 
1325         //    if (CkMyPe() < entry->pNo) continue;
1326         if (!isMaster(entry->pNo)) continue;
1327 #else
1328         // smaller one do it, which has the original object
1329         if (CkMyPe() == entry->pNo+1 || 
1330             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1331 #endif
1332         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1333
1334         entry->updateBuddy(CkMyPe(), entry->pNo);
1335         CkArrayCheckPTMessage *msg = entry->getCopy();
1336         // gzheng
1337         //thisProxy[CkMyPe()].inmem_restore(msg);
1338         inmem_restore(msg);
1339 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1340         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1341         int homePe = mgr->homePe(msg->index);
1342         if (homePe != CkMyPe()) {
1343           gmap[homePe].push_back(msg->locMgr);
1344           imap[homePe].push_back(msg->index);
1345         }
1346 #endif
1347         CkFreeMsg(msg);
1348         count ++;
1349       }
1350 #else
1351       char * packData;
1352       if(CmiNumPartition()==1){
1353         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1354         packData = (char *)msg->packData;
1355       }
1356       else{
1357         int pointer = CpvAccess(curPointer);
1358         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1359         packData = msg->packData;
1360       }
1361 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1362       recoverAll(packData,gmap,imap);
1363 #else
1364       recoverAll(packData);
1365 #endif
1366 #endif
1367 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1368       for (int i=0; i<CkNumPes(); i++) {
1369         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1370           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1371           flag++;       
1372         }
1373       }
1374       delete [] imap;
1375       delete [] gmap;
1376 #endif
1377       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1378
1379       CKLOCMGR_LOOP(mgr->doneInserting(););
1380
1381       // _crashedNode = -1;
1382       CpvAccess(_crashedNode) = -1;
1383 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1384       if (CkMyPe() == 0)
1385         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1386 #else
1387       if(flag == 0)
1388       {
1389         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1390       }
1391 #endif
1392     }
1393
1394     void CkMemCheckPT::gotReply(){
1395       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1396     }
1397
1398     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1399 #if CMK_CHKP_ALL
1400       PUP::fromMem p(packData);
1401       int numElements;
1402       p|numElements;
1403       if(p.isUnpacking()){
1404         for(int i=0;i<numElements;i++){
1405           CkGroupID gID;
1406           CkArrayIndex idx;
1407           p|gID;
1408           p|idx;
1409           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1410           int homePe = mgr->homePe(idx);
1411 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1412           mgr->resume(idx,p,CmiTrue,CmiTrue);
1413 #else
1414           if(CmiNumPartition()==1)
1415             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1416           else{
1417             if(CkMyPe()==thisFailedPe){
1418               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1419             }
1420             else{
1421               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1422             }
1423           }
1424 #endif
1425 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1426           homePe = mgr->homePe(idx);
1427           if (homePe != CkMyPe()) {
1428             gmap[homePe].push_back(gID);
1429             imap[homePe].push_back(idx);
1430           }
1431 #endif
1432         }
1433       }
1434 #endif
1435     }
1436
1437
1438     // on every processor
1439     // turn load balancer back on
1440     void CkMemCheckPT::finishUp()
1441     {
1442       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1443       //CKLOCMGR_LOOP(mgr->doneInserting(););
1444
1445       if (CkMyPe() == thisFailedPe)
1446       {
1447         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1448         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1449         fflush(stdout);
1450       }
1451       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1452       inRestarting = 0;
1453
1454 #if CMK_CONVERSE_MPI    
1455       if(CmiNumPartition()!=1){
1456         CpvAccess(recvdProcChkp) = 0;
1457         CpvAccess(recvdArrayChkp) = 0;
1458         CpvAccess(curPointer)^=1;
1459         //notify my replica, restart is done
1460         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1461         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1462         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1463       }
1464       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1465         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1466       }
1467 #endif
1468
1469 #if CK_NO_PROC_POOL
1470 #if NODE_CHECKPOINT
1471       int numnodes = CmiNumPhysicalNodes();
1472 #else
1473       int numnodes = CkNumPes();
1474 #endif
1475       if (numnodes-totalFailed() <=2) {
1476         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1477         _memChkptOn = 0;
1478       }
1479 #endif
1480     }
1481
1482     void CkMemCheckPT::recoverFromSoftFailure()
1483     {
1484       inRestarting = 0;
1485       CpvAccess(recvdRemote) = 0;
1486       CpvAccess(recvdLocal) = 0;
1487       CpvAccess(localChkpDone) = 0;
1488       CpvAccess(remoteChkpDone) = 0;
1489       inCheckpointing = 0;
1490       notifyReplica = 0;
1491       if(CkMyPe() == 0){
1492         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1493       }
1494       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1495     }
1496     // called only on 0
1497     void CkMemCheckPT::quiescence(CkCallback &cb)
1498     {
1499       static int pe_count = 0;
1500       pe_count ++;
1501       CmiAssert(CkMyPe() == 0);
1502       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1503       if (pe_count == CkNumPes()) {
1504         pe_count = 0;
1505         cb.send();
1506       }
1507     }
1508
1509     // User callable function - to start a checkpoint
1510     // callback cb is used to pass control back
1511     void CkStartMemCheckpoint(CkCallback &cb)
1512     {
1513 #if CMK_MEM_CHECKPOINT
1514       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1515       /*if (_memChkptOn == 0) {
1516         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1517         cb.send();
1518         return;
1519         }*/
1520       if (CkInRestarting()) {
1521         // trying to checkpointing during restart
1522         cb.send();
1523         return;
1524       }
1525       // store user callback and user data
1526       CkMemCheckPT::cpCallback = cb;
1527
1528       // broadcast to start check pointing
1529       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1530       checkptMgr.doItNow(CkMyPe(), cb);
1531 #else
1532       // when mem checkpoint is disabled, invike cb immediately
1533       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1534       cb.send();
1535 #endif
1536     }
1537
1538     void CkRestartCheckPoint(int diePe)
1539     {
1540       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1541       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1542       // broadcast
1543       checkptMgr.restart(diePe);
1544     }
1545
1546     static int _diePE = -1;
1547
1548     // callback function used locally by ccs handler
1549     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1550     {
1551       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1552       CkRestartCheckPoint(_diePE);
1553     }
1554
1555
1556     // called on crashed PE
1557     static void restartBeginHandler(char *msg)
1558     {
1559       CmiFree(msg);
1560 #if CMK_MEM_CHECKPOINT
1561 #if CMK_USE_BARRIER
1562       if(CkMyPe()!=_diePE){
1563         printf("restar begin on %d\n",CkMyPe());
1564         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1565         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1566         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1567       }else{
1568         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1569         CkRestartCheckPointCallback(NULL, NULL);
1570       }
1571 #else
1572       static int count = 0;
1573       CmiAssert(CkMyPe() == _diePE);
1574       count ++;
1575       if (count == CkNumPes()) {
1576         printf("restart begin on %d\n",CkMyPe());
1577         CkRestartCheckPointCallback(NULL, NULL);
1578         count = 0;
1579       }
1580 #endif
1581 #endif
1582     }
1583
1584     extern void _discard_charm_message();
1585     extern void _resume_charm_message();
1586
1587     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1588       return data;
1589     }
1590
1591     static void restartBcastHandler(char *msg)
1592     {
1593 #if CMK_MEM_CHECKPOINT
1594       // advance phase counter
1595       CkMemCheckPT::inRestarting = 1;
1596       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1597       // gzheng
1598       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1599
1600       if (CkMyPe()==_diePE)
1601         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1602
1603       // reset QD counters
1604       /*  gzheng
1605           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1606        */
1607
1608       /*  gzheng
1609           if (CkMyPe()==_diePE)
1610           CkRestartCheckPointCallback(NULL, NULL);
1611        */
1612       CmiFree(msg);
1613
1614       _resume_charm_message();
1615
1616       // reduction
1617       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1618       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1619 #if CMK_USE_BARRIER
1620       //CmiPrintf("before reduce\n");   
1621       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1622       //CmiPrintf("after reduce\n");    
1623 #else
1624       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1625 #endif 
1626       checkpointed = 0;
1627 #endif
1628     }
1629
1630     extern void _initDone();
1631
1632     bool compare(char * buf1, char *buf2){
1633       PUP::checker pchecker(buf1,buf2);
1634       pchecker.skip();
1635       int numElements;
1636       pchecker|numElements;
1637       for(int i=0;i<numElements;i++){
1638         CkGroupID gID;
1639         CkArrayIndex idx;
1640
1641         pchecker|gID;
1642         pchecker|idx;
1643
1644         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1645         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1646       }
1647       return pchecker.getResult();
1648       //return true;
1649     }
1650     int getChecksum(char * buf){
1651       PUP::checker pchecker(buf);
1652       pchecker.skip();
1653       int numElements;
1654       pchecker|numElements;
1655 //      CkPrintf("num %d\n",numElements);
1656       for(int i=0;i<numElements;i++){
1657         CkGroupID gID;
1658         CkArrayIndex idx;
1659
1660         pchecker|gID;
1661         pchecker|idx;
1662
1663         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1664         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1665       }
1666       return pchecker.getChecksum();
1667     }
1668
1669     static void recvRemoteChkpHandler(char *msg){
1670 #if CMK_USE_CHECKSUM
1671       CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1672       CpvAccess(recvdRemote) = 1;
1673       if(CpvAccess(recvdLocal)==1){
1674         if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1675           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1676         }
1677         else{
1678           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1679         }
1680       }
1681 #else
1682       envelope *env = (envelope *)msg;
1683       CkUnpackMessage(&env);
1684       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1685       if(CpvAccess(recvdLocal)==1){
1686         int pointer = CpvAccess(curPointer);
1687         int size = CpvAccess(chkpBuf)[pointer]->len;
1688         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1689           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1690         }else
1691         {
1692           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1693         }
1694         delete chkpMsg;
1695         if(CkMyPe()==0)
1696           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1697       }else{
1698         CpvAccess(recvdRemote) = 1;
1699         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1700         CpvAccess(buddyBuf) = chkpMsg;
1701       }
1702 #endif
1703     }
1704
1705     static void replicaRecoverHandler(char *msg){
1706       CpvAccess(_remoteCrashedNode) = -1;
1707       CkMemCheckPT::replicaAlive = 1;
1708       //fflush(stdout);
1709       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1710       bool ret = true;
1711       CpvAccess(remoteChkpDone) = 1;
1712       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1713       CmiFree(msg);
1714
1715     }
1716     static void replicaChkpDoneHandler(char *msg){
1717       CpvAccess(remoteChkpDone) = 1;
1718       if(CpvAccess(localChkpDone) == 1)
1719         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1720       CmiFree(msg);
1721     }
1722
1723     static void replicaDieHandler(char * msg){
1724 #if CMK_CONVERSE_MPI    
1725       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1726       CpvAccess(_remoteCrashedNode) = diePe;
1727       CkMemCheckPT::replicaAlive = 0;
1728       if(CkMyPe()==diePe){
1729         CmiPrintf("pe %d in replicad word die\n",diePe);
1730         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1731         fflush(stdout);
1732       }
1733       find_spare_mpirank(diePe,CmiMyPartition()^1);
1734 #endif
1735       //broadcast to my partition to get local max iter
1736       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1737       CmiFree(msg);
1738     }
1739
1740
1741     static void replicaDieBcastHandler(char *msg){
1742       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1743       CpvAccess(_remoteCrashedNode) = diePe;
1744       CkMemCheckPT::replicaAlive = 0;
1745       CmiFree(msg);
1746     }
1747
1748     static void recoverRemoteProcDataHandler(char *msg){
1749       envelope *env = (envelope *)msg;
1750       CkUnpackMessage(&env);
1751       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1752
1753       //store the checkpoint
1754       int pointer = procMsg->pointer;
1755
1756
1757       if(CkMyPe()==CpvAccess(_crashedNode)){
1758         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1759         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1760         PUP::fromMem p(procMsg->packData);
1761         _handleProcData(p,CmiTrue);
1762         _initDone();
1763       }
1764       else{
1765         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1766         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1767         //_handleProcData(p,CmiFalse);
1768       }
1769       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1770       CKLOCMGR_LOOP(mgr->startInserting(););
1771
1772       CpvAccess(recvdProcChkp) =1;
1773       if(CpvAccess(recvdArrayChkp)==1){
1774         _resume_charm_message();
1775         _diePE = CpvAccess(_crashedNode);
1776         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1777         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1778         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1779       }
1780     }
1781
1782     static void recoverRemoteArrayDataHandler(char *msg){
1783       envelope *env = (envelope *)msg;
1784       CkUnpackMessage(&env);
1785       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1786
1787       //store the checkpoint
1788       int pointer = chkpMsg->pointer;
1789       CpvAccess(curPointer) = pointer;
1790       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1791       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1792       CpvAccess(recvdArrayChkp) =1;
1793       CkMemCheckPT::inRestarting = 1;
1794       if(CpvAccess(recvdProcChkp) == 1){
1795         _resume_charm_message();
1796         _diePE = CpvAccess(_crashedNode);
1797         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1798         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1799         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1800         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1801       }
1802     }
1803
1804     static void recvPhaseHandler(char * msg)
1805     {
1806       CpvAccess(_curRestartPhase)--;
1807       CkMemCheckPT::inRestarting = 1;
1808       CmiFree(msg);
1809     }
1810     // called on crashed processor
1811     static void recoverProcDataHandler(char *msg)
1812     {
1813 #if CMK_MEM_CHECKPOINT
1814       int i;
1815       envelope *env = (envelope *)msg;
1816       CkUnpackMessage(&env);
1817       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1818       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1819       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1820       PUP::fromMem p(procMsg->packData);
1821       _handleProcData(p,CmiTrue);
1822
1823       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1824       // gzheng
1825       CKLOCMGR_LOOP(mgr->startInserting(););
1826
1827       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1828       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1829       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1830       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1831
1832       _initDone();
1833       //   CpvAccess(_qd)->flushStates();
1834       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1835 #endif
1836     }
1837     //for replica, got the phase number from my neighbor processor in the current partition
1838     static void askPhaseHandler(char *msg)
1839     {
1840 #if CMK_MEM_CHECKPOINT
1841       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1842       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1843       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1844       CmiSetHandler(msg, recvPhaseHandlerIdx);
1845       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1846 #endif
1847     }
1848     // called on its backup processor
1849     // get backup message buffer and sent to crashed processor
1850     static void askProcDataHandler(char *msg)
1851     {
1852 #if CMK_MEM_CHECKPOINT
1853       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1854       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1855       if (CpvAccess(procChkptBuf) == NULL)  {
1856         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1857         CkAbort("no checkpoint found");
1858       }
1859       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1860       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1861
1862       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1863
1864       CkPackMessage(&env);
1865       CmiSetHandler(env, recoverProcDataHandlerIdx);
1866       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1867       CpvAccess(procChkptBuf) = NULL;
1868       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1869 #endif
1870     }
1871
1872     // called on PE 0
1873     void qd_callback(void *m)
1874     {
1875       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1876       fflush(stdout);
1877       CkFreeMsg(m);
1878       if(CmiNumPartition()==1){
1879 #ifdef CMK_SMP
1880         for(int i=0;i<CmiMyNodeSize();i++){
1881           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1882           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1883           CmiSetHandler(msg, askProcDataHandlerIdx);
1884           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1885           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1886         }
1887         return;
1888 #endif
1889         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1890         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1891         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1892         CmiSetHandler(msg, askProcDataHandlerIdx);
1893         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1894         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1895       }
1896       else{
1897         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1898         CmiSetHandler(msg, recvPhaseHandlerIdx);
1899         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1900       }
1901     }
1902
1903     // on crashed node
1904     void CkMemRestart(const char *dummy, CkArgMsg *args)
1905     {
1906 #if CMK_MEM_CHECKPOINT
1907       _diePE = CmiMyNode();
1908       CpvAccess( _crashedNode )= CmiMyNode();
1909       CkMemCheckPT::inRestarting = 1;
1910       _discard_charm_message();
1911
1912       /*if(CmiMyRank()==0){
1913         CkCallback cb(qd_callback);
1914         CkStartQD(cb);
1915         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1916         }*/
1917       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1918       if(CmiNumPartition()==1){
1919         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1920         restartT = CmiWallTimer();
1921         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1922         fflush(stdout);
1923         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1924         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1925         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1926         CmiSetHandler(msg, askProcDataHandlerIdx);
1927         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1928         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1929       }
1930       else{
1931         CkCallback cb(qd_callback);
1932         CkStartQD(cb);
1933       }
1934 #else
1935       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1936 #endif
1937     }
1938
1939     // can be called in other files
1940     // return true if it is in restarting
1941     extern "C"
1942       int CkInRestarting()
1943       {
1944 #if CMK_MEM_CHECKPOINT
1945         if (CpvAccess( _crashedNode)!=-1) return 1;
1946         // gzheng
1947         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1948         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1949         return CkMemCheckPT::inRestarting;
1950 #else
1951         return 0;
1952 #endif
1953       }
1954
1955     extern "C"
1956       int CkReplicaAlive()
1957       {
1958         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1959         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1960         return CkMemCheckPT::replicaAlive;
1961
1962         /*if(CkMemCheckPT::replicaDead==1)
1963           return 0;
1964           else          
1965           return 1;*/
1966       }
1967
1968     extern "C"
1969       int CkInCheckpointing()
1970       {
1971         return CkMemCheckPT::inCheckpointing;
1972       }
1973
1974     extern "C"
1975       void CkSetInLdb(){
1976 #if CMK_MEM_CHECKPOINT
1977         CkMemCheckPT::inLoadbalancing = 1;
1978 #endif
1979       }
1980
1981     extern "C"
1982       int CkInLdb(){
1983 #if CMK_MEM_CHECKPOINT
1984         return CkMemCheckPT::inLoadbalancing;
1985 #endif
1986         return 0;
1987       }
1988
1989     extern "C"
1990       void CkResetInLdb(){
1991 #if CMK_MEM_CHECKPOINT
1992         CkMemCheckPT::inLoadbalancing = 0;
1993 #endif
1994       }
1995
1996     /*****************************************************************************
1997       module initialization
1998      *****************************************************************************/
1999
2000     static int arg_where = CkCheckPoint_inMEM;
2001
2002 #if CMK_MEM_CHECKPOINT
2003     void init_memcheckpt(char **argv)
2004     {
2005       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2006         arg_where = CkCheckPoint_inDISK;
2007       }
2008       // initiliazing _crashedNode variable
2009       CpvInitialize(int, _crashedNode);
2010       CpvInitialize(int, _remoteCrashedNode);
2011       CpvAccess(_crashedNode) = -1;
2012       CpvAccess(_remoteCrashedNode) = -1;
2013       init_FI(argv);
2014     }
2015 #endif
2016
2017     class CkMemCheckPTInit: public Chare {
2018       public:
2019         CkMemCheckPTInit(CkArgMsg *m) {
2020 #if CMK_MEM_CHECKPOINT
2021           if (arg_where == CkCheckPoint_inDISK) {
2022             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2023           }
2024           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2025           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2026 #endif
2027         }
2028     };
2029
2030     static void notifyHandler(char *msg)
2031     {
2032 #if CMK_MEM_CHECKPOINT
2033       CmiFree(msg);
2034       /* immediately increase restart phase to filter old messages */
2035       CpvAccess(_curRestartPhase) ++;
2036       CpvAccess(_qd)->flushStates();
2037       _discard_charm_message();
2038
2039 #endif
2040     }
2041
2042     extern "C"
2043       void notify_crash(int node)
2044       {
2045 #ifdef CMK_MEM_CHECKPOINT
2046         CpvAccess( _crashedNode) = node;
2047 #ifdef CMK_SMP
2048         for(int i=0;i<CkMyNodeSize();i++){
2049           CpvAccessOther(_crashedNode,i)=node;
2050         }
2051 #endif
2052         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2053         CkMemCheckPT::inRestarting = 1;
2054
2055         // this may be in interrupt handler, send a message to reset QD
2056         int pe = CmiNodeFirst(CkMyNode());
2057         for(int i=0;i<CkMyNodeSize();i++){
2058           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2059           CmiSetHandler(msg, notifyHandlerIdx);
2060           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2061         }
2062 #endif
2063       }
2064
2065     extern "C" void (*notify_crash_fn)(int node);
2066
2067
2068 #if CMK_CONVERSE_MPI
2069     void buddyDieHandler(char *msg)
2070     {
2071 #if CMK_MEM_CHECKPOINT
2072       // notify
2073       CkMemCheckPT::inRestarting = 1;
2074       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2075       notify_crash(diepe);
2076       // send message to crash pe to let it restart
2077       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2078       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2079       int buddy = obj->BuddyPE(CmiMyPe());
2080       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2081       if (buddy == diepe)  {
2082         mpi_restart_crashed(diepe, newrank);
2083       }
2084 #endif
2085     }
2086
2087     void pingHandler(void *msg)
2088     {
2089       lastPingTime = CmiWallTimer();
2090       CmiFree(msg);
2091     }
2092
2093     void pingCheckHandler()
2094     {
2095 #if CMK_MEM_CHECKPOINT
2096       double now = CmiWallTimer();
2097       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2098         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2099         int i, pe, buddy;
2100         // tell everyone the buddy dies
2101         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2102         for (i = 1; i < CmiNumPes(); i++) {
2103           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2104           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2105         }
2106         buddy = pe;
2107         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2108         fflush(stdout);
2109         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2110         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2111         CmiSetHandler(msg, buddyDieHandlerIdx);
2112         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2113         //send to everyone in the other world
2114         if(CmiNumPartition()!=1){
2115           for(int i=0;i<CmiNumPes();i++){
2116             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2117             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2118             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2119             //fflush(stdout);
2120             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2121             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2122           }
2123         }
2124       }
2125         else 
2126           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2127 #endif
2128       }
2129
2130       void pingBuddy()
2131       {
2132 #if CMK_MEM_CHECKPOINT
2133         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2134         if (obj) {
2135           int buddy = obj->BuddyPE(CkMyPe());
2136           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2137           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2138           CmiSetHandler(msg, pingHandlerIdx);
2139           CmiGetRestartPhase(msg) = 9999;
2140           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2141         }
2142         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2143 #endif
2144       }
2145 #endif
2146
2147       // initproc
2148       void CkRegisterRestartHandler( )
2149       {
2150 #if CMK_MEM_CHECKPOINT
2151         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2152         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2153         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2154         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2155         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2156
2157         //for replica
2158         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2159         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2160         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2161         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2162         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2163         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2164         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2165         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2166         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2167
2168 #if CMK_CONVERSE_MPI
2169         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2170         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2171         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2172 #endif
2173
2174         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2175         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2176         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2177         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2178         CpvInitialize(int,curPointer);
2179         CpvInitialize(int,recvdLocal);
2180         CpvInitialize(int,localChkpDone);
2181         CpvInitialize(int,remoteChkpDone);
2182         CpvInitialize(int,recvdRemote);
2183         CpvInitialize(int,recvdProcChkp);
2184         CpvInitialize(int,localChecksum);
2185         CpvInitialize(int,remoteChecksum);
2186         CpvInitialize(int,recvdArrayChkp);
2187
2188         CpvAccess(procChkptBuf) = NULL;
2189         CpvAccess(buddyBuf) = NULL;
2190         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2191         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2192         CpvAccess(chkpBuf)[0] = NULL;
2193         CpvAccess(chkpBuf)[1] = NULL;
2194         CpvAccess(localProcChkpBuf)[0] = NULL;
2195         CpvAccess(localProcChkpBuf)[1] = NULL;
2196
2197         CpvAccess(curPointer) = 0;
2198         CpvAccess(recvdLocal) = 0;
2199         CpvAccess(localChkpDone) = 0;
2200         CpvAccess(remoteChkpDone) = 0;
2201         CpvAccess(recvdRemote) = 0;
2202         CpvAccess(recvdProcChkp) = 0;
2203         CpvAccess(localChecksum) = 0;
2204         CpvAccess(remoteChecksum) = 0;
2205         CpvAccess(recvdArrayChkp) = 0;
2206
2207         notify_crash_fn = notify_crash;
2208
2209 #if ! CMK_CONVERSE_MPI
2210         // print pid to kill
2211         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2212         //  sleep(4);
2213 #endif
2214 #endif
2215       }
2216
2217
2218       extern "C"
2219         int CkHasCheckpoints()
2220         {
2221           return checkpointed;
2222         }
2223
2224       /// @todo: the following definitions should be moved to a separate file containing
2225       // structures and functions about fault tolerance strategies
2226
2227       /**
2228        *  * @brief: function for killing a process                                             
2229        *   */
2230 #ifdef CMK_MEM_CHECKPOINT
2231 #if CMK_HAS_GETPID
2232       void killLocal(void *_dummy,double curWallTime){
2233         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2234         if(CmiWallTimer()<killTime-1){
2235           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2236         }else{ 
2237 #if CMK_CONVERSE_MPI
2238           CkDieNow();
2239 #else 
2240           kill(getpid(),SIGKILL);                                               
2241 #endif
2242         }              
2243       } 
2244 #else
2245       void killLocal(void *_dummy,double curWallTime){
2246         CmiAbort("kill() not supported!");
2247       }
2248 #endif
2249 #endif
2250
2251 #ifdef CMK_MEM_CHECKPOINT
2252       /**
2253        * @brief: reads the file with the kill information
2254        */
2255       void readKillFile(){
2256         FILE *fp=fopen(killFile,"r");
2257         if(!fp){
2258           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2259           return;
2260         }
2261         int proc;
2262         double sec;
2263         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2264           if(proc == CkMyNode() && CkMyRank() == 0){
2265             killTime = CmiWallTimer()+sec;
2266             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2267             CcdCallFnAfter(killLocal,NULL,sec*1000);
2268           }
2269         }
2270         fclose(fp);
2271       }
2272
2273 #if ! CMK_CONVERSE_MPI
2274       void CkDieNow()
2275       {
2276 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2277         // ignored for non-mpi version
2278         CmiPrintf("[%d] die now.\n", CmiMyPe());
2279         killTime = CmiWallTimer()+0.001;
2280         CcdCallFnAfter(killLocal,NULL,1);
2281 #endif
2282       }
2283 #endif
2284
2285 #endif
2286
2287 #include "CkMemCheckpoint.def.h"
2288
2289