changes for fault injector
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, _remoteCrashedNode);
78
79 // static, so that it is accessible from Converse part
80 int CkMemCheckPT::inRestarting = 0;
81 int CkMemCheckPT::inCheckpointing = 0;
82 int CkMemCheckPT::replicaAlive = 1;
83 int CkMemCheckPT::inLoadbalancing = 0;
84 double CkMemCheckPT::startTime;
85 char *CkMemCheckPT::stage;
86 CkCallback CkMemCheckPT::cpCallback;
87
88 int _memChkptOn = 1;                    // checkpoint is on or off
89
90 CkGroupID ckCheckPTGroupID;             // readonly
91
92 static int checkpointed = 0;
93
94 /// @todo the following declarations should be moved into a separate file for all 
95 // fault tolerant strategies
96
97 #ifdef CMK_MEM_CHECKPOINT
98 // name of the kill file that contains processes to be killed 
99 char *killFile;                                               
100 // flag for the kill file         
101 int killFlag=0;
102 // variable for storing the killing time
103 double killTime=0.0;
104 #endif
105
106 #ifdef CKLOCMGR_LOOP
107 #undef CKLOCMGR_LOOP
108 #endif
109 // loop over all CkLocMgr and do "code"
110 #define  CKLOCMGR_LOOP(code)    {       \
111   int numGroups = CkpvAccess(_groupIDTable)->size();    \
112   for(int i=0;i<numGroups;i++) {        \
113     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
114     if(obj->isLocMgr())  {      \
115       CkLocMgr *mgr = (CkLocMgr*)obj;   \
116       code      \
117     }   \
118   }     \
119 }
120
121 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
122 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
123 CpvDeclare(CkCheckPTMessage**, chkpBuf);
124 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
125 //store the checkpoint of the buddy to compare
126 //do not need the whole msg, can be the checksum
127 CpvDeclare(CkCheckPTMessage*, buddyBuf);
128 //pointer of the checkpoint going to be written
129 CpvDeclare(int, curPointer);
130 CpvDeclare(int, recvdRemote);
131 CpvDeclare(int, recvdLocal);
132 CpvDeclare(int, localChkpDone);
133 CpvDeclare(int, remoteChkpDone);
134 CpvDeclare(int, recvdArrayChkp);
135 CpvDeclare(int, recvdProcChkp);
136 CpvDeclare(int, localChecksum);
137 CpvDeclare(int, remoteChecksum);
138
139 bool compare(char * buf1, char * buf2);
140 int getChecksum(char * buf);
141 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
142 // Converse function handles
143 static int askPhaseHandlerIdx;
144 static int recvPhaseHandlerIdx;
145 static int askProcDataHandlerIdx;
146 static int restartBcastHandlerIdx;
147 static int recoverProcDataHandlerIdx;
148 static int restartBeginHandlerIdx;
149 static int recvRemoteChkpHandlerIdx;
150 static int replicaDieHandlerIdx;
151 static int replicaDieBcastHandlerIdx;
152 static int replicaRecoverHandlerIdx;
153 static int replicaChkpDoneHandlerIdx;
154 static int recoverRemoteProcDataHandlerIdx;
155 static int recoverRemoteArrayDataHandlerIdx;
156 static int notifyHandlerIdx;
157 // compute the backup processor
158 // FIXME: avoid crashed processors
159 #if CMK_CONVERSE_MPI
160 static int pingHandlerIdx;
161 static int pingCheckHandlerIdx;
162 static int buddyDieHandlerIdx;
163 static double lastPingTime = -1;
164
165 extern "C" void mpi_restart_crashed(int pe, int rank);
166 extern "C" int  find_spare_mpirank(int pe,int partition);
167
168 void pingBuddy();
169 void pingCheckHandler();
170
171 #endif
172 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
173
174 inline int CkMemCheckPT::BuddyPE(int pe)
175 {
176   int budpe;
177 #if NODE_CHECKPOINT
178   // buddy is the processor with same rank on the next physical node
179   int r1 = CmiPhysicalRank(pe);
180   int budnode = CmiPhysicalNodeID(pe);
181   do {
182     budnode = (budnode+1)%CmiNumPhysicalNodes();
183     int *pelist;
184     int num;
185     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
186     budpe = pelist[r1 % num];
187   } while (isFailed(budpe));
188   if (budpe == pe) {
189     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
190     CmiAbort("Failed to find a buddy processor");
191   }
192 #else
193   budpe = pe;
194   budpe = (budpe+1)%CkNumPes();
195 #endif
196   return budpe;
197 }
198
199 // called in array element constructor
200 // choose and register with 2 buddies for checkpoiting 
201 #if CMK_MEM_CHECKPOINT
202 void ArrayElement::init_checkpt() {
203   if (_memChkptOn == 0) return;
204   if (CkInRestarting()) {
205     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
206   }
207   // only master init checkpoint
208   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
209
210   budPEs[0] = CkMyPe();
211   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
212   CmiAssert(budPEs[0] != budPEs[1]);
213   // inform checkPTMgr
214   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
215   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
216   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
217   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
218 }
219 #endif
220
221 // entry function invoked by checkpoint mgr asking for checkpoint data
222 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
223 #if CMK_MEM_CHECKPOINT
224   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
225   //char index[128];   thisIndexMax.sprint(index);
226   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
227   CkLocMgr *locMgr = thisArray->getLocMgr();
228   CmiAssert(myRec!=NULL);
229   int size;
230   {
231     PUP::sizer p;
232     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
233     size = p.size();
234   }
235   int packSize = size/sizeof(double) +1;
236   CkArrayCheckPTMessage *msg =
237     new (packSize, 0) CkArrayCheckPTMessage;
238   msg->len = size;
239   msg->index =thisIndexMax;
240   msg->aid = thisArrayID;
241   msg->locMgr = locMgr->getGroupID();
242   msg->cp_flag = 1;
243   {
244     PUP::toMem p(msg->packData);
245     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
246   }
247
248   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
249   checkptMgr.recvData(msg, 2, budPEs);
250   delete m;
251 #endif
252 }
253
254 // checkpoint holder class - for memory checkpointing
255 class CkMemCheckPTInfo: public CkCheckPTInfo
256 {
257   CkArrayCheckPTMessage *ckBuffer;
258   public:
259   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
260     CkCheckPTInfo(a, loc, idx, pno)
261   {
262     ckBuffer = NULL;
263   }
264   ~CkMemCheckPTInfo() 
265   {
266     if (ckBuffer) delete ckBuffer; 
267   }
268   inline void updateBuffer(CkArrayCheckPTMessage *data) 
269   {
270     CmiAssert(data!=NULL);
271     if (ckBuffer) delete ckBuffer;
272     ckBuffer = data;
273   }    
274   inline CkArrayCheckPTMessage * getCopy()
275   {
276     if (ckBuffer == NULL) {
277       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
278       CmiAbort("Abort!");
279     }
280     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
281   }     
282   inline void updateBuddy(int b1, int b2) {
283     CmiAssert(ckBuffer);
284     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
285     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
286     CmiAssert(pNo != CkMyPe());
287   }
288   inline int getSize() { 
289     CmiAssert(ckBuffer);
290     return ckBuffer->len; 
291   }
292 };
293
294 // checkpoint holder class - for in-disk checkpointing
295 class CkDiskCheckPTInfo: public CkCheckPTInfo 
296 {
297   char *fname;
298   int bud1, bud2;
299   int len;                      // checkpoint size
300   public:
301   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
302   {
303 #if CMK_USE_MKSTEMP
304     fname = new char[64];
305     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
306     mkstemp(fname);
307 #else
308     fname=tmpnam(NULL);
309 #endif
310     bud1 = bud2 = -1;
311     len = 0;
312   }
313   ~CkDiskCheckPTInfo() 
314   {
315     remove(fname);
316   }
317   inline void updateBuffer(CkArrayCheckPTMessage *data) 
318   {
319     double t = CmiWallTimer();
320     // unpack it
321     envelope *env = UsrToEnv(data);
322     CkUnpackMessage(&env);
323     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
324     FILE *f = fopen(fname,"wb");
325     PUP::toDisk p(f);
326     CkPupMessage(p, (void **)&data);
327     // delay sync to the end because otherwise the messages are blocked
328     //    fsync(fileno(f));
329     fclose(f);
330     bud1 = data->bud1;
331     bud2 = data->bud2;
332     len = data->len;
333     delete data;
334     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
335   }
336   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
337   {
338     CkArrayCheckPTMessage *data;
339     FILE *f = fopen(fname,"rb");
340     PUP::fromDisk p(f);
341     CkPupMessage(p, (void **)&data);
342     fclose(f);
343     data->bud1 = bud1;                          // update the buddies
344     data->bud2 = bud2;
345     return data;
346   }
347   inline void updateBuddy(int b1, int b2) {
348     bud1 = b1; bud2 = b2;
349     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
350     CmiAssert(pNo != CkMyPe());
351   }
352   inline int getSize() { 
353     return len; 
354   }
355 };
356
357 CkMemCheckPT::CkMemCheckPT(int w)
358 {
359   int numnodes = 0;
360 #if NODE_CHECKPOINT
361   numnodes = CmiNumPhysicalNodes();
362 #else
363   numnodes = CkNumPes();
364 #endif
365 #if CK_NO_PROC_POOL
366   if (numnodes <= 2)
367 #else
368     if (numnodes  == 1)
369 #endif
370     {
371       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
372       _memChkptOn = 0;
373     }
374   inRestarting = 0;
375   inCheckpointing = 0;
376   recvCount = peCount = 0;
377   ackCount = 0;
378   expectCount = -1;
379   where = w;
380   replicaAlive = 1;
381   notifyReplica = 0;
382 #if CMK_CONVERSE_MPI
383   void pingBuddy();
384   void pingCheckHandler();
385   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
386   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
387 #endif
388   chkpTable[0] = NULL;
389   chkpTable[1] = NULL;
390   maxIter = -1;
391   recvIterCount = 0;
392   localDecided = false;
393 }
394
395 CkMemCheckPT::~CkMemCheckPT()
396 {
397   int len = ckTable.length();
398   for (int i=0; i<len; i++) {
399     delete ckTable[i];
400   }
401 }
402
403 void CkMemCheckPT::pup(PUP::er& p) 
404
405   CBase_CkMemCheckPT::pup(p); 
406   p|cpStarter;
407   p|thisFailedPe;
408   p|failedPes;
409   p|ckCheckPTGroupID;           // recover global variable
410   p|cpCallback;                 // store callback
411   p|where;                      // where to checkpoint
412   p|peCount;
413   if (p.isUnpacking()) {
414     recvCount = 0;
415 #if CMK_CONVERSE_MPI
416     void pingBuddy();
417     void pingCheckHandler();
418     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
419     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
420 #endif
421     maxIter = -1;
422     recvIterCount = 0;
423     localDecided = false;
424   }
425 }
426
427 void CkMemCheckPT::getIter(){
428   localDecided = true;
429   localMaxIter = maxIter+1;
430   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
431   int elemCount = CkCountChkpSyncElements();
432   if(elemCount == 0){
433     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
434   }
435 }
436
437 //when one replica failes, decide the next chkp iter
438
439 void CkMemCheckPT::recvIter(int iter){
440   if(maxIter<iter){
441     maxIter=iter;
442   }
443 }
444
445 void CkMemCheckPT::recvMaxIter(int iter){
446   localDecided = false;
447   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
448 }
449
450 void CkMemCheckPT::reachChkpIter(){
451   recvIterCount++;
452   elemCount = CkCountChkpSyncElements();
453   if(recvIterCount == elemCount){
454     recvIterCount = 0;
455     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
456   }
457 }
458
459 void CkMemCheckPT::startChkp(){
460   CkPrintf("start checkpoint\n");
461   CkStartMemCheckpoint(cpCallback);
462 }
463
464 // called by checkpoint mgr to restore an array element
465 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
466 {
467 #if CMK_MEM_CHECKPOINT
468   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
469   // m->index.print();
470   PUP::fromMem p(m->packData);
471   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
472   CmiAssert(mgr);
473 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
474   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
475 #else
476   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
477 #endif
478
479   // find a list of array elements bound together
480   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
481   CmiAssert(elt);
482   CkLocRec_local *rec = elt->myRec;
483   CkVec<CkMigratable *> list;
484   mgr->migratableList(rec, list);
485   CmiAssert(list.length() > 0);
486   for (int l=0; l<list.length(); l++) {
487     elt = (ArrayElement *)list[l];
488     elt->budPEs[0] = m->bud1;
489     elt->budPEs[1] = m->bud2;
490     //    reset, may not needed now
491     // for now.
492     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
493       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
494       if (c) c->redNo = 0;
495     }
496   }
497 #endif
498 }
499
500 // return 1 if pe is a crashed processor
501 int CkMemCheckPT::isFailed(int pe)
502 {
503   for (int i=0; i<failedPes.length(); i++)
504     if (failedPes[i] == pe) return 1;
505   return 0;
506 }
507
508 // add pe into history list of all failed processors
509 void CkMemCheckPT::failed(int pe)
510 {
511   if (isFailed(pe)) return;
512   failedPes.push_back(pe);
513 }
514
515 int CkMemCheckPT::totalFailed()
516 {
517   return failedPes.length();
518 }
519
520 // create an checkpoint entry for array element of aid with index.
521 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
522 {
523   // error check, no duplicate
524   int idx, len = ckTable.size();
525   for (idx=0; idx<len; idx++) {
526     CkCheckPTInfo *entry = ckTable[idx];
527     if (index == entry->index) {
528       if (loc == entry->locMgr) {
529         // bindTo array elements
530         return;
531       }
532       // for array inheritance, the following check may fail
533       // because ArrayElement constructor of all superclasses are called
534       if (aid == entry->aid) {
535         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
536         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
537       }
538     }
539   }
540   CkCheckPTInfo *newEntry;
541   if (where == CkCheckPoint_inMEM)
542     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
543   else
544     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
545   ckTable.push_back(newEntry);
546   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
547 }
548
549 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
550 {
551 #if !CMK_CHKP_ALL       
552   int buddy = msg->bud1;
553   if (buddy == CkMyPe()) buddy = msg->bud2;
554   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
555   recvData(msg);
556   // ack
557   thisProxy[buddy].gotData();
558 #else
559   chkpTable[0] = NULL;
560   chkpTable[1] = NULL;
561   recvArrayCheckpoint(msg);
562   thisProxy[msg->bud2].gotData();
563 #endif
564 }
565
566 // loop through my checkpoint table and ask checkpointed array elements
567 // to send me checkpoint data.
568 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
569 {
570   checkpointed = 1;
571   cpCallback = cb;
572   cpStarter = starter;
573   inCheckpointing = 1;
574   if (CkMyPe() == cpStarter) {
575     startTime = CmiWallTimer();
576     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
577   }
578 #if CMK_CONVERSE_MPI
579   if(CmiNumPartition()==1)
580 #endif    
581   {
582
583 #if !CMK_CHKP_ALL
584     int len = ckTable.length();
585     for (int i=0; i<len; i++) {
586       CkCheckPTInfo *entry = ckTable[i];
587       // always let the bigger number processor send request
588       //if (CkMyPe() < entry->pNo) continue;
589       // always let the smaller number processor send request, may on same proc
590       if (!isMaster(entry->pNo)) continue;
591       // call inmem_checkpoint to the array element, ask it to send
592       // back checkpoint data via recvData().
593       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
594       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
595     }
596     // if my table is empty, then I am done
597     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
598 #else
599     startArrayCheckpoint();
600 #endif
601     sendProcData();
602   }
603 #if CMK_CONVERSE_MPI
604   else
605   {
606     startCheckpoint();
607   }
608 #endif
609   // pack and send proc level data
610 }
611
612 class MemElementPacker : public CkLocIterator{
613   private:
614     //    CkLocMgr *locMgr;
615     PUP::er &p;
616     std::map<CkHashCode,CkLocation> arrayMap;
617   public:
618     MemElementPacker(PUP::er &p_):p(p_){};
619     void addLocation(CkLocation &loc){
620       CkArrayIndexMax idx = loc.getIndex();
621       arrayMap[idx.hash()] = loc;
622     }
623     void writeCheckpoint(){
624       std::map<CkHashCode, CkLocation>::iterator it;
625       for(it = arrayMap.begin();it!=arrayMap.end();it++){
626         CkLocation loc = it->second;
627         CkLocMgr *locMgr = loc.getManager();
628         CkArrayIndexMax idx = loc.getIndex();
629         CkGroupID gID = locMgr->ckGetGroupID();
630         p|gID;
631         p|idx;
632         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
633       }
634     }
635 };
636
637 void pupAllElements(PUP::er &p){
638 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
639   int numElements;
640   if(!p.isUnpacking()){
641     numElements = CkCountArrayElements();
642   }
643   p | numElements;
644   if(!p.isUnpacking()){
645     MemElementPacker packer(p);
646     CKLOCMGR_LOOP(mgr->iterate(packer););
647     packer.writeCheckpoint();
648   }
649 #endif
650 }
651
652 void CkMemCheckPT::startArrayCheckpoint(){
653 #if CMK_CHKP_ALL
654   int size;
655   {
656     PUP::sizer psizer;
657     pupAllElements(psizer);
658     size = psizer.size();
659   }
660   int packSize = size/sizeof(double)+1;
661   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
662   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
663   msg->len = size;
664   msg->cp_flag = 1;
665   int budPEs[2];
666   msg->bud1=CkMyPe();
667   msg->bud2=ChkptOnPe(CkMyPe());
668   {
669     PUP::toMem p(msg->packData);
670     pupAllElements(p);
671   }
672   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
673   if(chkpTable[0]) delete chkpTable[0];
674   chkpTable[0] = msg;
675   //send the checkpoint to my 
676   recvCount++;
677 #endif
678 }
679
680 void CkMemCheckPT::startCheckpoint(){
681 #if CMK_CONVERSE_MPI
682   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
683     CkPrintf("in start checkpointing!!!!\n");
684   int size;
685   {
686     PUP::sizer p;
687     _handleProcData(p,CmiFalse);
688     size = p.size();
689   }
690   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
691   procMsg->len = size;
692   procMsg->cp_flag = 1;
693   {
694     PUP::toMem p(procMsg->packData);
695     _handleProcData(p,CmiFalse);
696   }
697   int pointer = CpvAccess(curPointer);
698   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
699   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
700
701   {
702     PUP::sizer psizer;
703     pupAllElements(psizer);
704     size = psizer.size();
705   }
706   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
707   msg->len = size;
708   msg->cp_flag = 1;
709   int checksum;
710   {
711 //#if CMK_USE_CHECKSUM
712     if(CpvAccess(use_checksum)){
713       PUP::checker p(msg->packData);
714       pupAllElements(p);
715       checksum = p.getChecksum();
716     }else{  
717 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
718 //#else 
719       PUP::toMem p(msg->packData);
720       pupAllElements(p);
721     }
722 //#endif
723   }
724   pointer = CpvAccess(curPointer);
725   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
726   CpvAccess(chkpBuf)[pointer] = msg;
727   if(CkMyPe()==0)
728     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
729   if(CkReplicaAlive()==1){
730     CpvAccess(recvdLocal) = 1;
731 //#if CMK_USE_CHECKSUM
732 //    CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
733 //    CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
734 //    delete tmpMsg;
735     if(CpvAccess(use_checksum)){
736       CpvAccess(localChecksum) = checksum;
737       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
738       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
739       CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
740       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
741     }else{
742 //#else
743       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
744       CkPackMessage(&env);
745       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
746       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
747     }
748 //#endif
749   }
750   if(CpvAccess(recvdRemote)==1){
751     //compare the checkpoint 
752     int size = CpvAccess(chkpBuf)[pointer]->len;
753 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
754 //#if CMK_USE_CHECKSUM
755     if(CpvAccess(use_checksum)){
756       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
757         thisProxy[CkMyPe()].doneComparison(true);
758       }
759       else{
760         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
761         thisProxy[CkMyPe()].doneComparison(false);
762       }
763     }else{
764 //#else
765       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
766         thisProxy[CkMyPe()].doneComparison(true);
767       }
768       else{
769         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
770         thisProxy[CkMyPe()].doneComparison(false);
771       }
772     }
773 //#endif
774     if(CkMyPe()==0)
775       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
776   }
777   else{
778     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
779       { 
780         int pointer = CpvAccess(curPointer);
781         //send the proc data
782         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
783         procMsg->pointer = pointer;
784         envelope * env = (envelope *)(UsrToEnv(procMsg));
785         CkPackMessage(&env);
786         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
787         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
788         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
789           CkPrintf("[%d] sendProcdata\n",CkMyPe());
790         }
791       }
792       //send the array checkpoint data
793       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
794       msg->pointer = CpvAccess(curPointer);
795       envelope * env = (envelope *)(UsrToEnv(msg));
796       CkPackMessage(&env);
797       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
798       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
799       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
800         CkPrintf("[%d] sendArraydata\n",CkMyPe());
801       //can continue work, no need to wait for my replica
802     }
803   }
804 #endif
805 }
806
807 void CkMemCheckPT::doneComparison(bool ret){
808   int _ret = 1;
809   if(!ret){
810     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
811     _ret = 0;
812   }else{
813     _ret = 1;
814   }
815   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
816   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
817 }
818
819 void CkMemCheckPT::doneRComparison(int ret){
820   //    if(CpvAccess(curPointer) == 0){
821   //if(ret==CkNumPes()){
822     CpvAccess(localChkpDone) = 1;
823     if(CpvAccess(remoteChkpDone) ==1){
824       thisProxy.doneBothComparison();
825     }
826     if(notifyReplica == 0){
827       //notify the replica am done
828       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
829       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
830       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
831       notifyReplica = 1;
832     }
833  // }
834  /* else{
835     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
836     startTime = CmiWallTimer();
837     thisProxy.RollBack();
838   }*/
839 }
840
841 void CkMemCheckPT::doneBothComparison(){
842   CpvAccess(recvdRemote) = 0;
843   CpvAccess(recvdLocal) = 0;
844   CpvAccess(localChkpDone) = 0;
845   CpvAccess(remoteChkpDone) = 0;
846   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
847   CpvAccess(curPointer)^=1;
848   inCheckpointing = 0;
849   notifyReplica = 0;
850   if(CkMyPe() == 0){
851     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
852   }
853   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
854 }
855
856 void CkMemCheckPT::RollBack(){
857   //restore group data
858   checkpointed = 0;
859   CkMemCheckPT::inRestarting = 1;
860   int pointer = CpvAccess(curPointer)^1;//use the previous one
861   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
862   PUP::fromMem p(chkpMsg->packData);    
863
864   //destroy array elements
865   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
866   int numGroups = CkpvAccess(_groupIDTable)->size();
867   for(int i=0;i<numGroups;i++) {
868     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
869     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
870     obj->flushStates();
871     obj->ckJustMigrated();
872   }
873   //restore array elements
874
875   int numElements;
876   p|numElements;
877
878   if(p.isUnpacking()){
879     for(int i=0;i<numElements;i++){
880       CkGroupID gID;
881       CkArrayIndex idx;
882       p|gID;
883       p|idx;
884       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
885       CmiAssert(mgr);
886       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
887     }
888     }
889     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
890     contribute(cb);
891   }
892
893   void CkMemCheckPT::notifyReplicaDie(int pe){
894     replicaAlive = 0;
895     CpvAccess(_remoteCrashedNode) = pe;
896   }
897
898   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
899   {
900 #if CMK_CHKP_ALL
901     int idx = 1;
902     if(msg->bud1 == CkMyPe()){
903       idx = 0;
904     }
905     int isChkpting = msg->cp_flag;
906     if(isChkpting == 1){
907       if(chkpTable[idx]) delete chkpTable[idx];
908     }
909     chkpTable[idx] = msg;
910     if(isChkpting){
911       recvCount++;
912       if(recvCount == 2){
913         if (where == CkCheckPoint_inMEM) {
914           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
915         }
916         else if (where == CkCheckPoint_inDISK) {
917           // another barrier for finalize the writing using fsync
918           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
919           contribute(0,NULL,CkReduction::sum_int,localcb);
920         }
921         else
922           CmiAbort("Unknown checkpoint scheme");
923         recvCount = 0;
924       }
925     }
926 #endif
927   }
928
929   // don't handle array elements
930   static inline void _handleProcData(PUP::er &p, CmiBool create)
931   {
932     // save readonlys, and callback BTW
933     CkPupROData(p);
934
935     // save mainchares 
936     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
937
938 #ifndef CMK_CHARE_USE_PTR
939     // save non-migratable chare
940     CkPupChareData(p);
941 #endif
942
943     // save groups into Groups.dat
944     CkPupGroupData(p,create);
945
946     // save nodegroups into NodeGroups.dat
947     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
948   }
949
950   void CkMemCheckPT::sendProcData()
951   {
952     // find out size of buffer
953     int size;
954     {
955       PUP::sizer p;
956       _handleProcData(p,CmiTrue);
957       size = p.size();
958     }
959     int packSize = size;
960     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
961     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
962     {
963       PUP::toMem p(msg->packData);
964       _handleProcData(p,CmiTrue);
965     }
966     msg->pe = CkMyPe();
967     msg->len = size;
968     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
969     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
970   }
971
972   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
973   {
974     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
975     CpvAccess(procChkptBuf) = msg;
976     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
977     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
978   }
979
980   // ArrayElement call this function to give us the checkpointed data
981   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
982   {
983     int len = ckTable.length();
984     int idx;
985     for (idx=0; idx<len; idx++) {
986       CkCheckPTInfo *entry = ckTable[idx];
987       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
988     }
989     CkAssert(idx < len);
990     int isChkpting = msg->cp_flag;
991     ckTable[idx]->updateBuffer(msg);
992     if (isChkpting) {
993       // all my array elements have returned their inmem data
994       // inform starter processor that I am done.
995       recvCount ++;
996       if (recvCount == ckTable.length()) {
997         if (where == CkCheckPoint_inMEM) {
998           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
999         }
1000         else if (where == CkCheckPoint_inDISK) {
1001           // another barrier for finalize the writing using fsync
1002           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1003           contribute(0,NULL,CkReduction::sum_int,localcb);
1004         }
1005         else
1006           CmiAbort("Unknown checkpoint scheme");
1007         recvCount = 0;
1008       } 
1009     }
1010   }
1011
1012   // only used in disk checkpointing
1013   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1014   {
1015     delete m;
1016 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1017     system("sync");
1018 #endif
1019     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1020   }
1021
1022   // only is called on cpStarter when checkpoint is done
1023   void CkMemCheckPT::cpFinish()
1024   {
1025     CmiAssert(CkMyPe() == cpStarter);
1026     peCount++;
1027     // now that all processors have finished, activate callback
1028     if (peCount == 2) 
1029     {
1030       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1031       peCount = 0;
1032       thisProxy.report();
1033     }
1034   }
1035
1036   // for debugging, report checkpoint info
1037   void CkMemCheckPT::report()
1038   {
1039     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1040     inCheckpointing = 0;
1041 #if !CMK_CHKP_ALL
1042     int objsize = 0;
1043     int len = ckTable.length();
1044     for (int i=0; i<len; i++) {
1045       CkCheckPTInfo *entry = ckTable[i];
1046       CmiAssert(entry);
1047       objsize += entry->getSize();
1048     }
1049     CmiAssert(CpvAccess(procChkptBuf));
1050     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1051 #else
1052     if(CkMyPe()==0)
1053       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1054 #endif
1055   }
1056
1057   /*****************************************************************************
1058     RESTART Procedure
1059    *****************************************************************************/
1060
1061   // master processor of two buddies
1062   inline int CkMemCheckPT::isMaster(int buddype)
1063   {
1064 #if 0
1065     int mype = CkMyPe();
1066     //CkPrintf("ismaster: %d %d\n", pe, mype);
1067     if (CkNumPes() - totalFailed() == 2) {
1068       return mype > buddype;
1069     }
1070     for (int i=1; i<CkNumPes(); i++) {
1071       int me = (buddype+i)%CkNumPes();
1072       if (isFailed(me)) continue;
1073       if (me == mype) return 1;
1074       else return 0;
1075     }
1076     return 0;
1077 #else
1078     // smaller one
1079     int mype = CkMyPe();
1080     //CkPrintf("ismaster: %d %d\n", pe, mype);
1081     if (CkNumPes() - totalFailed() == 2) {
1082       return mype < buddype;
1083     }
1084 #if NODE_CHECKPOINT
1085     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1086     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1087 #else
1088       for (int i=1; i<CkNumPes(); i++) {
1089 #endif
1090         int me = (mype+i)%CkNumPes();
1091         if (isFailed(me)) continue;
1092         if (me == buddype) return 1;
1093         else return 0;
1094       }
1095       return 0;
1096 #endif
1097     }
1098
1099
1100
1101 #if 0
1102     // helper class to pup all elements that belong to same ckLocMgr
1103     class ElementDestoryer : public CkLocIterator {
1104       private:
1105         CkLocMgr *locMgr;
1106       public:
1107         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1108         void addLocation(CkLocation &loc) {
1109           CkArrayIndex idx=loc.getIndex();
1110           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1111           loc.destroy();
1112         }
1113     };
1114 #endif
1115
1116     // restore the bitmap vector for LB
1117     void CkMemCheckPT::resetLB(int diepe)
1118     {
1119 #if CMK_LBDB_ON
1120       int i;
1121       char *bitmap = new char[CkNumPes()];
1122       // set processor available bitmap
1123       get_avail_vector(bitmap);
1124       for (i=0; i<failedPes.length(); i++)
1125         bitmap[failedPes[i]] = 0; 
1126       bitmap[diepe] = 0;
1127
1128 #if CK_NO_PROC_POOL
1129       set_avail_vector(bitmap);
1130 #endif
1131
1132       // if I am the crashed pe, rebuild my failedPEs array
1133       if (CkMyNode() == diepe)
1134         for (i=0; i<CkNumPes(); i++) 
1135           if (bitmap[i]==0) failed(i);
1136
1137       delete [] bitmap;
1138 #endif
1139     }
1140
1141     static double restartT;
1142
1143     // in case when failedPe dies, everybody go through its checkpoint table:
1144     // destory all array elements
1145     // recover lost buddies
1146     // reconstruct all array elements from check point data
1147     // called on all processors
1148     void CkMemCheckPT::restart(int diePe)
1149     {
1150 #if CMK_MEM_CHECKPOINT
1151       double curTime = CmiWallTimer();
1152       if (CkMyPe() == diePe){
1153         restartT = CmiWallTimer();
1154         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1155       }
1156       stage = (char*)"resetLB";
1157       startTime = curTime;
1158       if (CkMyPe() == diePe)
1159         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1160
1161 #if CK_NO_PROC_POOL
1162       failed(diePe);    // add into the list of failed pes
1163 #endif
1164       thisFailedPe = diePe;
1165
1166       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1167
1168       inRestarting = 1;
1169
1170       // disable load balancer's barrier
1171       //if (CkMyPe() != diePe) resetLB(diePe);
1172
1173       CKLOCMGR_LOOP(mgr->startInserting(););
1174
1175
1176       if(CmiNumPartition()==1){
1177         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1178       }else{
1179         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1180         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1181       }
1182 #endif
1183     }
1184
1185     // loally remove all array elements
1186     void CkMemCheckPT::removeArrayElements()
1187     {
1188 #if CMK_MEM_CHECKPOINT
1189       int len = ckTable.length();
1190       double curTime = CmiWallTimer();
1191       if (CkMyPe() == thisFailedPe) 
1192         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1193       stage = (char*)"removeArrayElements";
1194       startTime = curTime;
1195
1196       //  if (cpCallback.isInvalid()) 
1197       //          CkPrintf("invalid pe %d\n",CkMyPe());
1198       //          CkAbort("Didn't set restart callback\n");;
1199       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1200
1201       // get rid of all buffering and remote recs
1202       // including destorying all array elements
1203 #if CK_NO_PROC_POOL  
1204       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1205 #else
1206       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1207 #endif
1208       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1209 #endif
1210     }
1211
1212     // flush state in reduction manager
1213     void CkMemCheckPT::resetReductionMgr()
1214     {
1215       if (CkMyPe() == thisFailedPe) 
1216         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1217       int numGroups = CkpvAccess(_groupIDTable)->size();
1218       for(int i=0;i<numGroups;i++) {
1219         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1220         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1221         obj->flushStates();
1222         obj->ckJustMigrated();
1223       }
1224       // reset again
1225       //CpvAccess(_qd)->flushStates();
1226       if(CmiNumPartition()==1){
1227         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1228       }
1229       else
1230         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1231     }
1232
1233     // recover the lost buddies
1234     void CkMemCheckPT::recoverBuddies()
1235     {
1236       int idx;
1237       int len = ckTable.length();
1238       // ready to flush reduction manager
1239       // cannot be CkMemCheckPT::restart because destory will modify states
1240       double curTime = CmiWallTimer();
1241       if (CkMyPe() == thisFailedPe)
1242         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1243       stage = (char *)"recoverBuddies";
1244       if (CkMyPe() == thisFailedPe)
1245         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1246       startTime = curTime;
1247
1248       // recover buddies
1249       expectCount = 0;
1250 #if !CMK_CHKP_ALL
1251       for (idx=0; idx<len; idx++) {
1252         CkCheckPTInfo *entry = ckTable[idx];
1253         if (entry->pNo == thisFailedPe) {
1254 #if CK_NO_PROC_POOL
1255           // find a new buddy
1256           int budPe = BuddyPE(CkMyPe());
1257 #else
1258           int budPe = thisFailedPe;
1259 #endif
1260           CkArrayCheckPTMessage *msg = entry->getCopy();
1261           msg->bud1 = budPe;
1262           msg->bud2 = CkMyPe();
1263           msg->cp_flag = 0;            // not checkpointing
1264           thisProxy[budPe].recoverEntry(msg);
1265           expectCount ++;
1266         }
1267       }
1268 #else
1269       //send to failed pe
1270       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1271 #if CK_NO_PROC_POOL
1272         // find a new buddy
1273         int budPe = BuddyPE(CkMyPe());
1274 #else
1275         int budPe = thisFailedPe;
1276 #endif
1277         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1278         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1279         msg->cp_flag = 0;            // not checkpointing
1280         msg->bud1 = budPe;
1281         msg->bud2 = CkMyPe();
1282         thisProxy[budPe].recoverEntry(msg);
1283         expectCount ++;
1284       }
1285 #endif
1286
1287       if (expectCount == 0) {
1288         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1289       }
1290       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1291     }
1292
1293     void CkMemCheckPT::gotData()
1294     {
1295       ackCount ++;
1296       if (ackCount == expectCount) {
1297         ackCount = 0;
1298         expectCount = -1;
1299         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1300         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1301       }
1302     }
1303
1304     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1305     {
1306
1307       for (int i=0; i<n; i++) {
1308         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1309         mgr->updateLocation(idx[i], nowOnPe);
1310       }
1311       thisProxy[nowOnPe].gotReply();
1312     }
1313
1314     // restore array elements
1315     void CkMemCheckPT::recoverArrayElements()
1316     {
1317       double curTime = CmiWallTimer();
1318       int len = ckTable.length();
1319       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1320       stage = (char *)"recoverArrayElements";
1321       if (CkMyPe() == thisFailedPe)
1322         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1323       startTime = curTime;
1324       int flag = 0;
1325       // recover all array elements
1326       int count = 0;
1327
1328 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1329       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1330       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1331 #endif
1332
1333 #if !CMK_CHKP_ALL
1334       for (int idx=0; idx<len; idx++)
1335       {
1336         CkCheckPTInfo *entry = ckTable[idx];
1337 #if CK_NO_PROC_POOL
1338         // the bigger one will do 
1339         //    if (CkMyPe() < entry->pNo) continue;
1340         if (!isMaster(entry->pNo)) continue;
1341 #else
1342         // smaller one do it, which has the original object
1343         if (CkMyPe() == entry->pNo+1 || 
1344             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1345 #endif
1346         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1347
1348         entry->updateBuddy(CkMyPe(), entry->pNo);
1349         CkArrayCheckPTMessage *msg = entry->getCopy();
1350         // gzheng
1351         //thisProxy[CkMyPe()].inmem_restore(msg);
1352         inmem_restore(msg);
1353 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1354         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1355         int homePe = mgr->homePe(msg->index);
1356         if (homePe != CkMyPe()) {
1357           gmap[homePe].push_back(msg->locMgr);
1358           imap[homePe].push_back(msg->index);
1359         }
1360 #endif
1361         CkFreeMsg(msg);
1362         count ++;
1363       }
1364 #else
1365       char * packData;
1366       if(CmiNumPartition()==1){
1367         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1368         packData = (char *)msg->packData;
1369       }
1370       else{
1371         int pointer = CpvAccess(curPointer);
1372         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1373         packData = msg->packData;
1374       }
1375 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1376       recoverAll(packData,gmap,imap);
1377 #else
1378       recoverAll(packData);
1379 #endif
1380 #endif
1381 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1382       for (int i=0; i<CkNumPes(); i++) {
1383         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1384           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1385           flag++;       
1386         }
1387       }
1388       delete [] imap;
1389       delete [] gmap;
1390 #endif
1391       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1392
1393       CKLOCMGR_LOOP(mgr->doneInserting(););
1394
1395       // _crashedNode = -1;
1396       CpvAccess(_crashedNode) = -1;
1397 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1398       if (CkMyPe() == 0)
1399         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1400 #else
1401       if(flag == 0)
1402       {
1403         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1404       }
1405 #endif
1406     }
1407
1408     void CkMemCheckPT::gotReply(){
1409       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1410     }
1411
1412     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1413 #if CMK_CHKP_ALL
1414       PUP::fromMem p(packData);
1415       int numElements;
1416       p|numElements;
1417       if(p.isUnpacking()){
1418         for(int i=0;i<numElements;i++){
1419           CkGroupID gID;
1420           CkArrayIndex idx;
1421           p|gID;
1422           p|idx;
1423           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1424           int homePe = mgr->homePe(idx);
1425 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1426           mgr->resume(idx,p,CmiTrue,CmiTrue);
1427 #else
1428           if(CmiNumPartition()==1)
1429             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1430           else{
1431             if(CkMyPe()==thisFailedPe){
1432               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1433             }
1434             else{
1435               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1436             }
1437           }
1438 #endif
1439 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1440           homePe = mgr->homePe(idx);
1441           if (homePe != CkMyPe()) {
1442             gmap[homePe].push_back(gID);
1443             imap[homePe].push_back(idx);
1444           }
1445 #endif
1446         }
1447       }
1448 #endif
1449     }
1450
1451
1452     // on every processor
1453     // turn load balancer back on
1454     void CkMemCheckPT::finishUp()
1455     {
1456       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1457       //CKLOCMGR_LOOP(mgr->doneInserting(););
1458
1459       if (CkMyPe() == thisFailedPe)
1460       {
1461         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1462         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1463         fflush(stdout);
1464       }
1465       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1466       inRestarting = 0;
1467
1468 #if CMK_CONVERSE_MPI    
1469       if(CmiNumPartition()!=1){
1470         CpvAccess(recvdProcChkp) = 0;
1471         CpvAccess(recvdArrayChkp) = 0;
1472         CpvAccess(curPointer)^=1;
1473         //notify my replica, restart is done
1474         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1475         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1476         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1477       }
1478       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1479         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1480       }
1481 #endif
1482
1483 #if CK_NO_PROC_POOL
1484 #if NODE_CHECKPOINT
1485       int numnodes = CmiNumPhysicalNodes();
1486 #else
1487       int numnodes = CkNumPes();
1488 #endif
1489       if (numnodes-totalFailed() <=2) {
1490         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1491         _memChkptOn = 0;
1492       }
1493 #endif
1494     }
1495
1496     void CkMemCheckPT::recoverFromSoftFailure()
1497     {
1498       inRestarting = 0;
1499       CpvAccess(recvdRemote) = 0;
1500       CpvAccess(recvdLocal) = 0;
1501       CpvAccess(localChkpDone) = 0;
1502       CpvAccess(remoteChkpDone) = 0;
1503       inCheckpointing = 0;
1504       notifyReplica = 0;
1505       if(CkMyPe() == 0){
1506         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1507       }
1508       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1509     }
1510     // called only on 0
1511     void CkMemCheckPT::quiescence(CkCallback &cb)
1512     {
1513       static int pe_count = 0;
1514       pe_count ++;
1515       CmiAssert(CkMyPe() == 0);
1516       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1517       if (pe_count == CkNumPes()) {
1518         pe_count = 0;
1519         cb.send();
1520       }
1521     }
1522
1523     // User callable function - to start a checkpoint
1524     // callback cb is used to pass control back
1525     void CkStartMemCheckpoint(CkCallback &cb)
1526     {
1527 #if CMK_MEM_CHECKPOINT
1528       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1529       /*if (_memChkptOn == 0) {
1530         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1531         cb.send();
1532         return;
1533         }*/
1534       if (CkInRestarting()) {
1535         // trying to checkpointing during restart
1536         cb.send();
1537         return;
1538       }
1539       // store user callback and user data
1540       CkMemCheckPT::cpCallback = cb;
1541
1542       // broadcast to start check pointing
1543       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1544       checkptMgr.doItNow(CkMyPe(), cb);
1545 #else
1546       // when mem checkpoint is disabled, invike cb immediately
1547       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1548       cb.send();
1549 #endif
1550     }
1551
1552     void CkRestartCheckPoint(int diePe)
1553     {
1554       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1555       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1556       // broadcast
1557       checkptMgr.restart(diePe);
1558     }
1559
1560     static int _diePE = -1;
1561
1562     // callback function used locally by ccs handler
1563     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1564     {
1565       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1566       CkRestartCheckPoint(_diePE);
1567     }
1568
1569
1570     // called on crashed PE
1571     static void restartBeginHandler(char *msg)
1572     {
1573       CmiFree(msg);
1574 #if CMK_MEM_CHECKPOINT
1575 #if CMK_USE_BARRIER
1576       if(CkMyPe()!=_diePE){
1577         printf("restar begin on %d\n",CkMyPe());
1578         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1579         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1580         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1581       }else{
1582         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1583         CkRestartCheckPointCallback(NULL, NULL);
1584       }
1585 #else
1586       static int count = 0;
1587       CmiAssert(CkMyPe() == _diePE);
1588       count ++;
1589       if (count == CkNumPes()) {
1590         printf("restart begin on %d\n",CkMyPe());
1591         CkRestartCheckPointCallback(NULL, NULL);
1592         count = 0;
1593       }
1594 #endif
1595 #endif
1596     }
1597
1598     extern void _discard_charm_message();
1599     extern void _resume_charm_message();
1600
1601     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1602       return data;
1603     }
1604
1605     static void restartBcastHandler(char *msg)
1606     {
1607 #if CMK_MEM_CHECKPOINT
1608       // advance phase counter
1609       CkMemCheckPT::inRestarting = 1;
1610       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1611       // gzheng
1612       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1613
1614       if (CkMyPe()==_diePE)
1615         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1616
1617       // reset QD counters
1618       /*  gzheng
1619           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1620        */
1621
1622       /*  gzheng
1623           if (CkMyPe()==_diePE)
1624           CkRestartCheckPointCallback(NULL, NULL);
1625        */
1626       CmiFree(msg);
1627
1628       _resume_charm_message();
1629
1630       // reduction
1631       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1632       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1633 #if CMK_USE_BARRIER
1634       //CmiPrintf("before reduce\n");   
1635       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1636       //CmiPrintf("after reduce\n");    
1637 #else
1638       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1639 #endif 
1640       checkpointed = 0;
1641 #endif
1642     }
1643
1644     extern void _initDone();
1645
1646     bool compare(char * buf1, char *buf2){
1647       if(CkMyPe()==0)
1648         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1649       PUP::checker pchecker(buf1,buf2);
1650       pchecker.skip();
1651       int numElements;
1652       pchecker|numElements;
1653       for(int i=0;i<numElements;i++){
1654         CkGroupID gID;
1655         CkArrayIndex idx;
1656
1657         pchecker|gID;
1658         pchecker|idx;
1659
1660         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1661         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1662       }
1663       if(CkMyPe()==0)
1664         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1665       int fault_num = pchecker.getFaultNum();
1666       bool result = pchecker.getResult();
1667       if(!result){
1668         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1669       }
1670       return result;
1671     }
1672     int getChecksum(char * buf){
1673       PUP::checker pchecker(buf);
1674       pchecker.skip();
1675       int numElements;
1676       pchecker|numElements;
1677 //      CkPrintf("num %d\n",numElements);
1678       for(int i=0;i<numElements;i++){
1679         CkGroupID gID;
1680         CkArrayIndex idx;
1681
1682         pchecker|gID;
1683         pchecker|idx;
1684
1685         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1686         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1687       }
1688       return pchecker.getChecksum();
1689     }
1690
1691     static void recvRemoteChkpHandler(char *msg){
1692 //#if CMK_USE_CHECKSUM
1693       if(CpvAccess(use_checksum)){
1694         if(CkMyPe()==0)
1695           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1696         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1697         CpvAccess(recvdRemote) = 1;
1698         if(CpvAccess(recvdLocal)==1){
1699           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1700             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1701           }
1702           else{
1703             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1704           }
1705         }
1706       }else{
1707 //#else
1708         envelope *env = (envelope *)msg;
1709         CkUnpackMessage(&env);
1710         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1711         if(CpvAccess(recvdLocal)==1){
1712           int pointer = CpvAccess(curPointer);
1713           int size = CpvAccess(chkpBuf)[pointer]->len;
1714           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1715             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1716           }else
1717           {
1718             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1719           }
1720           delete chkpMsg;
1721           if(CkMyPe()==0)
1722             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1723         }else{
1724           CpvAccess(recvdRemote) = 1;
1725           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1726           CpvAccess(buddyBuf) = chkpMsg;
1727         }
1728       }
1729 //#endif
1730     }
1731
1732     static void replicaRecoverHandler(char *msg){
1733       CpvAccess(_remoteCrashedNode) = -1;
1734       CkMemCheckPT::replicaAlive = 1;
1735       //fflush(stdout);
1736       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1737       bool ret = true;
1738       CpvAccess(remoteChkpDone) = 1;
1739       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1740       CmiFree(msg);
1741
1742     }
1743     static void replicaChkpDoneHandler(char *msg){
1744       CpvAccess(remoteChkpDone) = 1;
1745       if(CpvAccess(localChkpDone) == 1)
1746         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1747       CmiFree(msg);
1748     }
1749
1750     static void replicaDieHandler(char * msg){
1751 #if CMK_CONVERSE_MPI    
1752       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1753       CpvAccess(_remoteCrashedNode) = diePe;
1754       CkMemCheckPT::replicaAlive = 0;
1755       if(CkMyPe()==diePe){
1756         CmiPrintf("pe %d in replicad word die\n",diePe);
1757         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1758         fflush(stdout);
1759       }
1760       find_spare_mpirank(diePe,CmiMyPartition()^1);
1761 #endif
1762       //broadcast to my partition to get local max iter
1763       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1764       CmiFree(msg);
1765     }
1766
1767
1768     static void replicaDieBcastHandler(char *msg){
1769       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1770       CpvAccess(_remoteCrashedNode) = diePe;
1771       CkMemCheckPT::replicaAlive = 0;
1772       CmiFree(msg);
1773     }
1774
1775     static void recoverRemoteProcDataHandler(char *msg){
1776       envelope *env = (envelope *)msg;
1777       CkUnpackMessage(&env);
1778       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1779
1780       //store the checkpoint
1781       int pointer = procMsg->pointer;
1782
1783
1784       if(CkMyPe()==CpvAccess(_crashedNode)){
1785         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1786         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1787         PUP::fromMem p(procMsg->packData);
1788         _handleProcData(p,CmiTrue);
1789         _initDone();
1790       }
1791       else{
1792         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1793         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1794         //_handleProcData(p,CmiFalse);
1795       }
1796       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1797       CKLOCMGR_LOOP(mgr->startInserting(););
1798
1799       CpvAccess(recvdProcChkp) =1;
1800       if(CpvAccess(recvdArrayChkp)==1){
1801         _resume_charm_message();
1802         _diePE = CpvAccess(_crashedNode);
1803         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1804         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1805         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1806       }
1807     }
1808
1809     static void recoverRemoteArrayDataHandler(char *msg){
1810       envelope *env = (envelope *)msg;
1811       CkUnpackMessage(&env);
1812       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1813
1814       //store the checkpoint
1815       int pointer = chkpMsg->pointer;
1816       CpvAccess(curPointer) = pointer;
1817       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1818       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1819       CpvAccess(recvdArrayChkp) =1;
1820       CkMemCheckPT::inRestarting = 1;
1821       if(CpvAccess(recvdProcChkp) == 1){
1822         _resume_charm_message();
1823         _diePE = CpvAccess(_crashedNode);
1824         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1825         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1826         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1827         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1828       }
1829     }
1830
1831     static void recvPhaseHandler(char * msg)
1832     {
1833       CpvAccess(_curRestartPhase)--;
1834       CkMemCheckPT::inRestarting = 1;
1835       CmiFree(msg);
1836     }
1837     // called on crashed processor
1838     static void recoverProcDataHandler(char *msg)
1839     {
1840 #if CMK_MEM_CHECKPOINT
1841       int i;
1842       envelope *env = (envelope *)msg;
1843       CkUnpackMessage(&env);
1844       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1845       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1846       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1847       PUP::fromMem p(procMsg->packData);
1848       _handleProcData(p,CmiTrue);
1849
1850       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1851       // gzheng
1852       CKLOCMGR_LOOP(mgr->startInserting(););
1853
1854       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1855       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1856       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1857       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1858
1859       _initDone();
1860       //   CpvAccess(_qd)->flushStates();
1861       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1862 #endif
1863     }
1864     //for replica, got the phase number from my neighbor processor in the current partition
1865     static void askPhaseHandler(char *msg)
1866     {
1867 #if CMK_MEM_CHECKPOINT
1868       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1869       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1870       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1871       CmiSetHandler(msg, recvPhaseHandlerIdx);
1872       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1873 #endif
1874     }
1875     // called on its backup processor
1876     // get backup message buffer and sent to crashed processor
1877     static void askProcDataHandler(char *msg)
1878     {
1879 #if CMK_MEM_CHECKPOINT
1880       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1881       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1882       if (CpvAccess(procChkptBuf) == NULL)  {
1883         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1884         CkAbort("no checkpoint found");
1885       }
1886       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1887       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1888
1889       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1890
1891       CkPackMessage(&env);
1892       CmiSetHandler(env, recoverProcDataHandlerIdx);
1893       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1894       CpvAccess(procChkptBuf) = NULL;
1895       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1896 #endif
1897     }
1898
1899     // called on PE 0
1900     void qd_callback(void *m)
1901     {
1902       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1903       fflush(stdout);
1904       CkFreeMsg(m);
1905       if(CmiNumPartition()==1){
1906 #ifdef CMK_SMP
1907         for(int i=0;i<CmiMyNodeSize();i++){
1908           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1909           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1910           CmiSetHandler(msg, askProcDataHandlerIdx);
1911           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1912           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1913         }
1914         return;
1915 #endif
1916         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1917         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1918         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1919         CmiSetHandler(msg, askProcDataHandlerIdx);
1920         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1921         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1922       }
1923       else{
1924         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1925         CmiSetHandler(msg, recvPhaseHandlerIdx);
1926         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1927       }
1928     }
1929
1930     // on crashed node
1931     void CkMemRestart(const char *dummy, CkArgMsg *args)
1932     {
1933 #if CMK_MEM_CHECKPOINT
1934       _diePE = CmiMyNode();
1935       CpvAccess( _crashedNode )= CmiMyNode();
1936       CkMemCheckPT::inRestarting = 1;
1937       _discard_charm_message();
1938
1939       /*if(CmiMyRank()==0){
1940         CkCallback cb(qd_callback);
1941         CkStartQD(cb);
1942         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1943         }*/
1944       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1945       if(CmiNumPartition()==1){
1946         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1947         restartT = CmiWallTimer();
1948         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1949         fflush(stdout);
1950         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1951         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1952         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1953         CmiSetHandler(msg, askProcDataHandlerIdx);
1954         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1955         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1956       }
1957       else{
1958         CkCallback cb(qd_callback);
1959         CkStartQD(cb);
1960       }
1961 #else
1962       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1963 #endif
1964     }
1965
1966     // can be called in other files
1967     // return true if it is in restarting
1968     extern "C"
1969       int CkInRestarting()
1970       {
1971 #if CMK_MEM_CHECKPOINT
1972         if (CpvAccess( _crashedNode)!=-1) return 1;
1973         // gzheng
1974         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1975         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1976         return CkMemCheckPT::inRestarting;
1977 #else
1978         return 0;
1979 #endif
1980       }
1981
1982     extern "C"
1983       int CkReplicaAlive()
1984       {
1985         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1986         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1987         return CkMemCheckPT::replicaAlive;
1988
1989         /*if(CkMemCheckPT::replicaDead==1)
1990           return 0;
1991           else          
1992           return 1;*/
1993       }
1994
1995     extern "C"
1996       int CkInCheckpointing()
1997       {
1998         return CkMemCheckPT::inCheckpointing;
1999       }
2000
2001     extern "C"
2002       void CkSetInLdb(){
2003 #if CMK_MEM_CHECKPOINT
2004         CkMemCheckPT::inLoadbalancing = 1;
2005 #endif
2006       }
2007
2008     extern "C"
2009       int CkInLdb(){
2010 #if CMK_MEM_CHECKPOINT
2011         return CkMemCheckPT::inLoadbalancing;
2012 #endif
2013         return 0;
2014       }
2015
2016     extern "C"
2017       void CkResetInLdb(){
2018 #if CMK_MEM_CHECKPOINT
2019         CkMemCheckPT::inLoadbalancing = 0;
2020 #endif
2021       }
2022
2023     /*****************************************************************************
2024       module initialization
2025      *****************************************************************************/
2026
2027     static int arg_where = CkCheckPoint_inMEM;
2028
2029 #if CMK_MEM_CHECKPOINT
2030     void init_memcheckpt(char **argv)
2031     {
2032       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2033         arg_where = CkCheckPoint_inDISK;
2034       }
2035       CpvInitialize(int, use_checksum);
2036       CpvAccess(use_checksum)=0;
2037       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2038         CpvAccess(use_checksum)=1;
2039       }
2040       // initiliazing _crashedNode variable
2041       CpvInitialize(int, _crashedNode);
2042       CpvInitialize(int, _remoteCrashedNode);
2043       CpvAccess(_crashedNode) = -1;
2044       CpvAccess(_remoteCrashedNode) = -1;
2045       init_FI(argv);
2046     }
2047 #endif
2048
2049     class CkMemCheckPTInit: public Chare {
2050       public:
2051         CkMemCheckPTInit(CkArgMsg *m) {
2052 #if CMK_MEM_CHECKPOINT
2053           if (arg_where == CkCheckPoint_inDISK) {
2054             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2055           }
2056           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2057           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2058 #endif
2059         }
2060     };
2061
2062     static void notifyHandler(char *msg)
2063     {
2064 #if CMK_MEM_CHECKPOINT
2065       CmiFree(msg);
2066       /* immediately increase restart phase to filter old messages */
2067       CpvAccess(_curRestartPhase) ++;
2068       CpvAccess(_qd)->flushStates();
2069       _discard_charm_message();
2070
2071 #endif
2072     }
2073
2074     extern "C"
2075       void notify_crash(int node)
2076       {
2077 #ifdef CMK_MEM_CHECKPOINT
2078         CpvAccess( _crashedNode) = node;
2079 #ifdef CMK_SMP
2080         for(int i=0;i<CkMyNodeSize();i++){
2081           CpvAccessOther(_crashedNode,i)=node;
2082         }
2083 #endif
2084         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2085         CkMemCheckPT::inRestarting = 1;
2086
2087         // this may be in interrupt handler, send a message to reset QD
2088         int pe = CmiNodeFirst(CkMyNode());
2089         for(int i=0;i<CkMyNodeSize();i++){
2090           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2091           CmiSetHandler(msg, notifyHandlerIdx);
2092           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2093         }
2094 #endif
2095       }
2096
2097     extern "C" void (*notify_crash_fn)(int node);
2098
2099
2100 #if CMK_CONVERSE_MPI
2101     void buddyDieHandler(char *msg)
2102     {
2103 #if CMK_MEM_CHECKPOINT
2104       // notify
2105       CkMemCheckPT::inRestarting = 1;
2106       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2107       notify_crash(diepe);
2108       // send message to crash pe to let it restart
2109       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2110       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2111       int buddy = obj->BuddyPE(CmiMyPe());
2112       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2113       if (buddy == diepe)  {
2114         mpi_restart_crashed(diepe, newrank);
2115       }
2116 #endif
2117     }
2118
2119     void pingHandler(void *msg)
2120     {
2121       lastPingTime = CmiWallTimer();
2122       CmiFree(msg);
2123     }
2124
2125     void pingCheckHandler()
2126     {
2127 #if CMK_MEM_CHECKPOINT
2128       double now = CmiWallTimer();
2129       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2130         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2131         int i, pe, buddy;
2132         // tell everyone the buddy dies
2133         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2134         for (i = 1; i < CmiNumPes(); i++) {
2135           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2136           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2137         }
2138         buddy = pe;
2139         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2140         fflush(stdout);
2141         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2142         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2143         CmiSetHandler(msg, buddyDieHandlerIdx);
2144         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2145         //send to everyone in the other world
2146         if(CmiNumPartition()!=1){
2147           for(int i=0;i<CmiNumPes();i++){
2148             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2149             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2150             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2151             //fflush(stdout);
2152             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2153             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2154           }
2155         }
2156       }
2157         else 
2158           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2159 #endif
2160       }
2161
2162       void pingBuddy()
2163       {
2164 #if CMK_MEM_CHECKPOINT
2165         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2166         if (obj) {
2167           int buddy = obj->BuddyPE(CkMyPe());
2168           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2169           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2170           CmiSetHandler(msg, pingHandlerIdx);
2171           CmiGetRestartPhase(msg) = 9999;
2172           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2173         }
2174         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2175 #endif
2176       }
2177 #endif
2178
2179       // initproc
2180       void CkRegisterRestartHandler( )
2181       {
2182 #if CMK_MEM_CHECKPOINT
2183         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2184         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2185         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2186         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2187         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2188
2189         //for replica
2190         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2191         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2192         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2193         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2194         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2195         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2196         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2197         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2198         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2199
2200 #if CMK_CONVERSE_MPI
2201         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2202         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2203         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2204 #endif
2205
2206         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2207         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2208         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2209         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2210         CpvInitialize(int,curPointer);
2211         CpvInitialize(int,recvdLocal);
2212         CpvInitialize(int,localChkpDone);
2213         CpvInitialize(int,remoteChkpDone);
2214         CpvInitialize(int,recvdRemote);
2215         CpvInitialize(int,recvdProcChkp);
2216         CpvInitialize(int,localChecksum);
2217         CpvInitialize(int,remoteChecksum);
2218         CpvInitialize(int,recvdArrayChkp);
2219
2220         CpvAccess(procChkptBuf) = NULL;
2221         CpvAccess(buddyBuf) = NULL;
2222         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2223         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2224         CpvAccess(chkpBuf)[0] = NULL;
2225         CpvAccess(chkpBuf)[1] = NULL;
2226         CpvAccess(localProcChkpBuf)[0] = NULL;
2227         CpvAccess(localProcChkpBuf)[1] = NULL;
2228
2229         CpvAccess(curPointer) = 0;
2230         CpvAccess(recvdLocal) = 0;
2231         CpvAccess(localChkpDone) = 0;
2232         CpvAccess(remoteChkpDone) = 0;
2233         CpvAccess(recvdRemote) = 0;
2234         CpvAccess(recvdProcChkp) = 0;
2235         CpvAccess(localChecksum) = 0;
2236         CpvAccess(remoteChecksum) = 0;
2237         CpvAccess(recvdArrayChkp) = 0;
2238
2239         notify_crash_fn = notify_crash;
2240
2241 #if ! CMK_CONVERSE_MPI
2242         // print pid to kill
2243         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2244         //  sleep(4);
2245 #endif
2246 #endif
2247       }
2248
2249
2250       extern "C"
2251         int CkHasCheckpoints()
2252         {
2253           return checkpointed;
2254         }
2255
2256       /// @todo: the following definitions should be moved to a separate file containing
2257       // structures and functions about fault tolerance strategies
2258
2259       /**
2260        *  * @brief: function for killing a process                                             
2261        *   */
2262 #ifdef CMK_MEM_CHECKPOINT
2263 #if CMK_HAS_GETPID
2264       void killLocal(void *_dummy,double curWallTime){
2265         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2266         if(CmiWallTimer()<killTime-1){
2267           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2268         }else{ 
2269 #if CMK_CONVERSE_MPI
2270           CkDieNow();
2271 #else 
2272           kill(getpid(),SIGKILL);                                               
2273 #endif
2274         }              
2275       } 
2276 #else
2277       void killLocal(void *_dummy,double curWallTime){
2278         CmiAbort("kill() not supported!");
2279       }
2280 #endif
2281 #endif
2282
2283 #ifdef CMK_MEM_CHECKPOINT
2284       /**
2285        * @brief: reads the file with the kill information
2286        */
2287       void readKillFile(){
2288         FILE *fp=fopen(killFile,"r");
2289         if(!fp){
2290           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2291           return;
2292         }
2293         int proc;
2294         double sec;
2295         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2296           if(proc == CkMyNode() && CkMyRank() == 0){
2297             killTime = CmiWallTimer()+sec;
2298             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2299             CcdCallFnAfter(killLocal,NULL,sec*1000);
2300           }
2301         }
2302         fclose(fp);
2303       }
2304
2305 #if ! CMK_CONVERSE_MPI
2306       void CkDieNow()
2307       {
2308 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2309         // ignored for non-mpi version
2310         CmiPrintf("[%d] die now.\n", CmiMyPe());
2311         killTime = CmiWallTimer()+0.001;
2312         CcdCallFnAfter(killLocal,NULL,1);
2313 #endif
2314       }
2315 #endif
2316
2317 #endif
2318
2319 #include "CkMemCheckpoint.def.h"
2320
2321