optimize for recover
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, _remoteCrashedNode);
78
79 // static, so that it is accessible from Converse part
80 int CkMemCheckPT::inRestarting = 0;
81 int CkMemCheckPT::inCheckpointing = 0;
82 int CkMemCheckPT::replicaAlive = 1;
83 int CkMemCheckPT::inLoadbalancing = 0;
84 double CkMemCheckPT::startTime;
85 char *CkMemCheckPT::stage;
86 CkCallback CkMemCheckPT::cpCallback;
87
88 int _memChkptOn = 1;                    // checkpoint is on or off
89
90 CkGroupID ckCheckPTGroupID;             // readonly
91
92 static int checkpointed = 0;
93
94 /// @todo the following declarations should be moved into a separate file for all 
95 // fault tolerant strategies
96
97 #ifdef CMK_MEM_CHECKPOINT
98 // name of the kill file that contains processes to be killed 
99 char *killFile;                                               
100 // flag for the kill file         
101 int killFlag=0;
102 // variable for storing the killing time
103 double killTime=0.0;
104 #endif
105
106 #ifdef CKLOCMGR_LOOP
107 #undef CKLOCMGR_LOOP
108 #endif
109 // loop over all CkLocMgr and do "code"
110 #define  CKLOCMGR_LOOP(code)    {       \
111   int numGroups = CkpvAccess(_groupIDTable)->size();    \
112   for(int i=0;i<numGroups;i++) {        \
113     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
114     if(obj->isLocMgr())  {      \
115       CkLocMgr *mgr = (CkLocMgr*)obj;   \
116       code      \
117     }   \
118   }     \
119 }
120
121 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
122 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
123 CpvDeclare(CkCheckPTMessage**, chkpBuf);
124 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
125 //store the checkpoint of the buddy to compare
126 //do not need the whole msg, can be the checksum
127 CpvDeclare(CkCheckPTMessage*, buddyBuf);
128 //pointer of the checkpoint going to be written
129 CpvDeclare(int, curPointer);
130 CpvDeclare(int, recvdRemote);
131 CpvDeclare(int, recvdLocal);
132 CpvDeclare(int, localChkpDone);
133 CpvDeclare(int, remoteChkpDone);
134 CpvDeclare(int, recvdArrayChkp);
135 CpvDeclare(int, recvdProcChkp);
136 CpvDeclare(int, localChecksum);
137 CpvDeclare(int, remoteChecksum);
138
139 bool compare(char * buf1, char * buf2);
140 int getChecksum(char * buf);
141 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
142 // Converse function handles
143 static int askPhaseHandlerIdx;
144 static int recvPhaseHandlerIdx;
145 static int askProcDataHandlerIdx;
146 static int restartBcastHandlerIdx;
147 static int recoverProcDataHandlerIdx;
148 static int restartBeginHandlerIdx;
149 static int recvRemoteChkpHandlerIdx;
150 static int replicaDieHandlerIdx;
151 static int replicaDieBcastHandlerIdx;
152 static int replicaRecoverHandlerIdx;
153 static int replicaChkpDoneHandlerIdx;
154 static int recoverRemoteProcDataHandlerIdx;
155 static int recoverRemoteArrayDataHandlerIdx;
156 static int notifyHandlerIdx;
157 // compute the backup processor
158 // FIXME: avoid crashed processors
159 #if CMK_CONVERSE_MPI
160 static int pingHandlerIdx;
161 static int pingCheckHandlerIdx;
162 static int buddyDieHandlerIdx;
163 static double lastPingTime = -1;
164
165 extern "C" void mpi_restart_crashed(int pe, int rank);
166 extern "C" int  find_spare_mpirank(int pe,int partition);
167
168 void pingBuddy();
169 void pingCheckHandler();
170
171 #endif
172 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
173
174 inline int CkMemCheckPT::BuddyPE(int pe)
175 {
176   int budpe;
177 #if NODE_CHECKPOINT
178   // buddy is the processor with same rank on the next physical node
179   int r1 = CmiPhysicalRank(pe);
180   int budnode = CmiPhysicalNodeID(pe);
181   do {
182     budnode = (budnode+1)%CmiNumPhysicalNodes();
183     int *pelist;
184     int num;
185     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
186     budpe = pelist[r1 % num];
187   } while (isFailed(budpe));
188   if (budpe == pe) {
189     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
190     CmiAbort("Failed to find a buddy processor");
191   }
192 #else
193   budpe = pe;
194   budpe = (budpe+1)%CkNumPes();
195 #endif
196   return budpe;
197 }
198
199 // called in array element constructor
200 // choose and register with 2 buddies for checkpoiting 
201 #if CMK_MEM_CHECKPOINT
202 void ArrayElement::init_checkpt() {
203   if (_memChkptOn == 0) return;
204   if (CkInRestarting()) {
205     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
206   }
207   // only master init checkpoint
208   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
209
210   budPEs[0] = CkMyPe();
211   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
212   CmiAssert(budPEs[0] != budPEs[1]);
213   // inform checkPTMgr
214   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
215   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
216   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
217   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
218 }
219 #endif
220
221 // entry function invoked by checkpoint mgr asking for checkpoint data
222 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
223 #if CMK_MEM_CHECKPOINT
224   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
225   //char index[128];   thisIndexMax.sprint(index);
226   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
227   CkLocMgr *locMgr = thisArray->getLocMgr();
228   CmiAssert(myRec!=NULL);
229   int size;
230   {
231     PUP::sizer p;
232     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
233     size = p.size();
234   }
235   int packSize = size/sizeof(double) +1;
236   CkArrayCheckPTMessage *msg =
237     new (packSize, 0) CkArrayCheckPTMessage;
238   msg->len = size;
239   msg->index =thisIndexMax;
240   msg->aid = thisArrayID;
241   msg->locMgr = locMgr->getGroupID();
242   msg->cp_flag = 1;
243   {
244     PUP::toMem p(msg->packData);
245     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
246   }
247
248   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
249   checkptMgr.recvData(msg, 2, budPEs);
250   delete m;
251 #endif
252 }
253
254 // checkpoint holder class - for memory checkpointing
255 class CkMemCheckPTInfo: public CkCheckPTInfo
256 {
257   CkArrayCheckPTMessage *ckBuffer;
258   public:
259   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
260     CkCheckPTInfo(a, loc, idx, pno)
261   {
262     ckBuffer = NULL;
263   }
264   ~CkMemCheckPTInfo() 
265   {
266     if (ckBuffer) delete ckBuffer; 
267   }
268   inline void updateBuffer(CkArrayCheckPTMessage *data) 
269   {
270     CmiAssert(data!=NULL);
271     if (ckBuffer) delete ckBuffer;
272     ckBuffer = data;
273   }    
274   inline CkArrayCheckPTMessage * getCopy()
275   {
276     if (ckBuffer == NULL) {
277       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
278       CmiAbort("Abort!");
279     }
280     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
281   }     
282   inline void updateBuddy(int b1, int b2) {
283     CmiAssert(ckBuffer);
284     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
285     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
286     CmiAssert(pNo != CkMyPe());
287   }
288   inline int getSize() { 
289     CmiAssert(ckBuffer);
290     return ckBuffer->len; 
291   }
292 };
293
294 // checkpoint holder class - for in-disk checkpointing
295 class CkDiskCheckPTInfo: public CkCheckPTInfo 
296 {
297   char *fname;
298   int bud1, bud2;
299   int len;                      // checkpoint size
300   public:
301   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
302   {
303 #if CMK_USE_MKSTEMP
304     fname = new char[64];
305     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
306     mkstemp(fname);
307 #else
308     fname=tmpnam(NULL);
309 #endif
310     bud1 = bud2 = -1;
311     len = 0;
312   }
313   ~CkDiskCheckPTInfo() 
314   {
315     remove(fname);
316   }
317   inline void updateBuffer(CkArrayCheckPTMessage *data) 
318   {
319     double t = CmiWallTimer();
320     // unpack it
321     envelope *env = UsrToEnv(data);
322     CkUnpackMessage(&env);
323     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
324     FILE *f = fopen(fname,"wb");
325     PUP::toDisk p(f);
326     CkPupMessage(p, (void **)&data);
327     // delay sync to the end because otherwise the messages are blocked
328     //    fsync(fileno(f));
329     fclose(f);
330     bud1 = data->bud1;
331     bud2 = data->bud2;
332     len = data->len;
333     delete data;
334     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
335   }
336   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
337   {
338     CkArrayCheckPTMessage *data;
339     FILE *f = fopen(fname,"rb");
340     PUP::fromDisk p(f);
341     CkPupMessage(p, (void **)&data);
342     fclose(f);
343     data->bud1 = bud1;                          // update the buddies
344     data->bud2 = bud2;
345     return data;
346   }
347   inline void updateBuddy(int b1, int b2) {
348     bud1 = b1; bud2 = b2;
349     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
350     CmiAssert(pNo != CkMyPe());
351   }
352   inline int getSize() { 
353     return len; 
354   }
355 };
356
357 CkMemCheckPT::CkMemCheckPT(int w)
358 {
359   int numnodes = 0;
360 #if NODE_CHECKPOINT
361   numnodes = CmiNumPhysicalNodes();
362 #else
363   numnodes = CkNumPes();
364 #endif
365 #if CK_NO_PROC_POOL
366   if (numnodes <= 2)
367 #else
368     if (numnodes  == 1)
369 #endif
370     {
371       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
372       _memChkptOn = 0;
373     }
374   inRestarting = 0;
375   inCheckpointing = 0;
376   recvCount = peCount = 0;
377   ackCount = 0;
378   expectCount = -1;
379   where = w;
380   replicaAlive = 1;
381   notifyReplica = 0;
382 #if CMK_CONVERSE_MPI
383   void pingBuddy();
384   void pingCheckHandler();
385   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
386   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
387 #endif
388   chkpTable[0] = NULL;
389   chkpTable[1] = NULL;
390   maxIter = -1;
391   recvIterCount = 0;
392   localDecided = false;
393 }
394
395 CkMemCheckPT::~CkMemCheckPT()
396 {
397   int len = ckTable.length();
398   for (int i=0; i<len; i++) {
399     delete ckTable[i];
400   }
401 }
402
403 void CkMemCheckPT::pup(PUP::er& p) 
404
405   CBase_CkMemCheckPT::pup(p); 
406   p|cpStarter;
407   p|thisFailedPe;
408   p|failedPes;
409   p|ckCheckPTGroupID;           // recover global variable
410   p|cpCallback;                 // store callback
411   p|where;                      // where to checkpoint
412   p|peCount;
413   if (p.isUnpacking()) {
414     recvCount = 0;
415 #if CMK_CONVERSE_MPI
416     void pingBuddy();
417     void pingCheckHandler();
418     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
419     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
420 #endif
421     maxIter = -1;
422     recvIterCount = 0;
423     localDecided = false;
424   }
425 }
426
427 void CkMemCheckPT::getIter(){
428   localDecided = true;
429   localMaxIter = maxIter+1;
430   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
431   int elemCount = CkCountChkpSyncElements();
432   if(elemCount == 0){
433     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
434   }
435 }
436
437 //when one replica failes, decide the next chkp iter
438
439 void CkMemCheckPT::recvIter(int iter){
440   if(maxIter<iter){
441     maxIter=iter;
442   }
443 }
444
445 void CkMemCheckPT::recvMaxIter(int iter){
446   localDecided = false;
447   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
448 }
449
450 void CkMemCheckPT::reachChkpIter(){
451   recvIterCount++;
452   elemCount = CkCountChkpSyncElements();
453   if(recvIterCount == elemCount){
454     recvIterCount = 0;
455     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
456   }
457 }
458
459 void CkMemCheckPT::startChkp(){
460   CkPrintf("start checkpoint\n");
461   CkStartMemCheckpoint(cpCallback);
462 }
463
464 // called by checkpoint mgr to restore an array element
465 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
466 {
467 #if CMK_MEM_CHECKPOINT
468   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
469   // m->index.print();
470   PUP::fromMem p(m->packData);
471   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
472   CmiAssert(mgr);
473 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
474   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
475 #else
476   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
477 #endif
478
479   // find a list of array elements bound together
480   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
481   CmiAssert(elt);
482   CkLocRec_local *rec = elt->myRec;
483   CkVec<CkMigratable *> list;
484   mgr->migratableList(rec, list);
485   CmiAssert(list.length() > 0);
486   for (int l=0; l<list.length(); l++) {
487     elt = (ArrayElement *)list[l];
488     elt->budPEs[0] = m->bud1;
489     elt->budPEs[1] = m->bud2;
490     //    reset, may not needed now
491     // for now.
492     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
493       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
494       if (c) c->redNo = 0;
495     }
496   }
497 #endif
498 }
499
500 // return 1 if pe is a crashed processor
501 int CkMemCheckPT::isFailed(int pe)
502 {
503   for (int i=0; i<failedPes.length(); i++)
504     if (failedPes[i] == pe) return 1;
505   return 0;
506 }
507
508 // add pe into history list of all failed processors
509 void CkMemCheckPT::failed(int pe)
510 {
511   if (isFailed(pe)) return;
512   failedPes.push_back(pe);
513 }
514
515 int CkMemCheckPT::totalFailed()
516 {
517   return failedPes.length();
518 }
519
520 // create an checkpoint entry for array element of aid with index.
521 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
522 {
523   // error check, no duplicate
524   int idx, len = ckTable.size();
525   for (idx=0; idx<len; idx++) {
526     CkCheckPTInfo *entry = ckTable[idx];
527     if (index == entry->index) {
528       if (loc == entry->locMgr) {
529         // bindTo array elements
530         return;
531       }
532       // for array inheritance, the following check may fail
533       // because ArrayElement constructor of all superclasses are called
534       if (aid == entry->aid) {
535         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
536         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
537       }
538     }
539   }
540   CkCheckPTInfo *newEntry;
541   if (where == CkCheckPoint_inMEM)
542     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
543   else
544     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
545   ckTable.push_back(newEntry);
546   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
547 }
548
549 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
550 {
551 #if !CMK_CHKP_ALL       
552   int buddy = msg->bud1;
553   if (buddy == CkMyPe()) buddy = msg->bud2;
554   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
555   recvData(msg);
556   // ack
557   thisProxy[buddy].gotData();
558 #else
559   chkpTable[0] = NULL;
560   chkpTable[1] = NULL;
561   recvArrayCheckpoint(msg);
562   thisProxy[msg->bud2].gotData();
563 #endif
564 }
565
566 // loop through my checkpoint table and ask checkpointed array elements
567 // to send me checkpoint data.
568 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
569 {
570   checkpointed = 1;
571   cpCallback = cb;
572   cpStarter = starter;
573   inCheckpointing = 1;
574   if (CkMyPe() == cpStarter) {
575     startTime = CmiWallTimer();
576     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
577   }
578 #if CMK_CONVERSE_MPI
579   if(CmiNumPartition()==1)
580 #endif    
581   {
582
583 #if !CMK_CHKP_ALL
584     int len = ckTable.length();
585     for (int i=0; i<len; i++) {
586       CkCheckPTInfo *entry = ckTable[i];
587       // always let the bigger number processor send request
588       //if (CkMyPe() < entry->pNo) continue;
589       // always let the smaller number processor send request, may on same proc
590       if (!isMaster(entry->pNo)) continue;
591       // call inmem_checkpoint to the array element, ask it to send
592       // back checkpoint data via recvData().
593       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
594       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
595     }
596     // if my table is empty, then I am done
597     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
598 #else
599     startArrayCheckpoint();
600 #endif
601     sendProcData();
602   }
603 #if CMK_CONVERSE_MPI
604   else
605   {
606     startCheckpoint();
607   }
608 #endif
609   // pack and send proc level data
610 }
611
612 class MemElementPacker : public CkLocIterator{
613   private:
614     //    CkLocMgr *locMgr;
615     PUP::er &p;
616     std::map<CkHashCode,CkLocation> arrayMap;
617   public:
618     MemElementPacker(PUP::er &p_):p(p_){};
619     void addLocation(CkLocation &loc){
620       CkArrayIndexMax idx = loc.getIndex();
621       arrayMap[idx.hash()] = loc;
622     }
623     void writeCheckpoint(){
624       std::map<CkHashCode, CkLocation>::iterator it;
625       for(it = arrayMap.begin();it!=arrayMap.end();it++){
626         CkLocation loc = it->second;
627         CkLocMgr *locMgr = loc.getManager();
628         CkArrayIndexMax idx = loc.getIndex();
629         CkGroupID gID = locMgr->ckGetGroupID();
630         p|gID;
631         p|idx;
632         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
633       }
634     }
635 };
636
637 void pupAllElements(PUP::er &p){
638 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
639   int numElements;
640   if(!p.isUnpacking()){
641     numElements = CkCountArrayElements();
642   }
643   p | numElements;
644   if(!p.isUnpacking()){
645     MemElementPacker packer(p);
646     CKLOCMGR_LOOP(mgr->iterate(packer););
647     packer.writeCheckpoint();
648   }
649 #endif
650 }
651
652 void CkMemCheckPT::startArrayCheckpoint(){
653 #if CMK_CHKP_ALL
654   int size;
655   {
656     PUP::sizer psizer;
657     pupAllElements(psizer);
658     size = psizer.size();
659   }
660   int packSize = size/sizeof(double)+1;
661   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
662   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
663   msg->len = size;
664   msg->cp_flag = 1;
665   int budPEs[2];
666   msg->bud1=CkMyPe();
667   msg->bud2=ChkptOnPe(CkMyPe());
668   {
669     PUP::toMem p(msg->packData);
670     pupAllElements(p);
671   }
672   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
673   if(chkpTable[0]) delete chkpTable[0];
674   chkpTable[0] = msg;
675   //send the checkpoint to my 
676   recvCount++;
677 #endif
678 }
679
680 void CkMemCheckPT::startCheckpoint(){
681 #if CMK_CONVERSE_MPI
682   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
683     CkPrintf("in start checkpointing!!!!\n");
684   int size;
685   {
686     PUP::sizer p;
687     _handleProcData(p,CmiFalse);
688     size = p.size();
689   }
690   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
691   procMsg->len = size;
692   procMsg->cp_flag = 1;
693   {
694     PUP::toMem p(procMsg->packData);
695     _handleProcData(p,CmiFalse);
696   }
697   int pointer = CpvAccess(curPointer);
698   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
699   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
700
701   {
702     PUP::sizer psizer;
703     pupAllElements(psizer);
704     size = psizer.size();
705   }
706   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
707   msg->len = size;
708   msg->cp_flag = 1;
709   int checksum;
710   {
711 //#if CMK_USE_CHECKSUM
712     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
713       PUP::checker p(msg->packData);
714       pupAllElements(p);
715       checksum = p.getChecksum();
716     }else{  
717 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
718 //#else 
719       PUP::toMem p(msg->packData);
720       pupAllElements(p);
721     }
722 //#endif
723   }
724   pointer = CpvAccess(curPointer);
725   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
726   CpvAccess(chkpBuf)[pointer] = msg;
727   if(CkMyPe()==0)
728     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
729   if(CkReplicaAlive()==1){
730     CpvAccess(recvdLocal) = 1;
731 //#if CMK_USE_CHECKSUM
732 //    CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
733 //    CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
734 //    delete tmpMsg;
735     if(CpvAccess(use_checksum)){
736       CpvAccess(localChecksum) = checksum;
737       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
738       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
739       CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
740       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
741     }else{
742 //#else
743       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
744       CkPackMessage(&env);
745       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
746       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
747     }
748 //#endif
749   }
750   if(CpvAccess(recvdRemote)==1){
751     //compare the checkpoint 
752     int size = CpvAccess(chkpBuf)[pointer]->len;
753 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
754 //#if CMK_USE_CHECKSUM
755     if(CpvAccess(use_checksum)){
756       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
757         thisProxy[CkMyPe()].doneComparison(true);
758       }
759       else{
760         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
761         thisProxy[CkMyPe()].doneComparison(false);
762       }
763     }else{
764 //#else
765       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
766         thisProxy[CkMyPe()].doneComparison(true);
767       }
768       else{
769         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
770         thisProxy[CkMyPe()].doneComparison(false);
771       }
772     }
773 //#endif
774     if(CkMyPe()==0)
775       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
776   }
777   else{
778     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
779       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
780       { 
781         int pointer = CpvAccess(curPointer);
782         //send the proc data
783         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
784         procMsg->pointer = pointer;
785         envelope * env = (envelope *)(UsrToEnv(procMsg));
786         CkPackMessage(&env);
787         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
788         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
789         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
790           CkPrintf("[%d] sendProcdata\n",CkMyPe());
791         }
792       }
793       //send the array checkpoint data
794       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
795       msg->pointer = CpvAccess(curPointer);
796       envelope * env = (envelope *)(UsrToEnv(msg));
797       CkPackMessage(&env);
798       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
799       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
800       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
801         CkPrintf("[%d] sendArraydata\n",CkMyPe());
802       //can continue work, no need to wait for my replica
803     }
804   }
805 #endif
806 }
807
808 void CkMemCheckPT::doneComparison(bool ret){
809   int _ret = 1;
810   if(!ret){
811     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
812     _ret = 0;
813   }else{
814     _ret = 1;
815   }
816   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
817   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
818 }
819
820 void CkMemCheckPT::doneRComparison(int ret){
821   //    if(CpvAccess(curPointer) == 0){
822   //if(ret==CkNumPes()){
823     CpvAccess(localChkpDone) = 1;
824     if(CpvAccess(remoteChkpDone) ==1){
825       thisProxy.doneBothComparison();
826     }
827     if(notifyReplica == 0){
828       //notify the replica am done
829       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
830       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
831       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
832       notifyReplica = 1;
833     }
834  // }
835  /* else{
836     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
837     startTime = CmiWallTimer();
838     thisProxy.RollBack();
839   }*/
840 }
841
842 void CkMemCheckPT::doneBothComparison(){
843   CpvAccess(recvdRemote) = 0;
844   CpvAccess(recvdLocal) = 0;
845   CpvAccess(localChkpDone) = 0;
846   CpvAccess(remoteChkpDone) = 0;
847   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
848   CpvAccess(curPointer)^=1;
849   inCheckpointing = 0;
850   notifyReplica = 0;
851   if(CkMyPe() == 0){
852     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
853   }
854   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
855 }
856
857 void CkMemCheckPT::RollBack(){
858   //restore group data
859   checkpointed = 0;
860   CkMemCheckPT::inRestarting = 1;
861   int pointer = CpvAccess(curPointer)^1;//use the previous one
862   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
863   PUP::fromMem p(chkpMsg->packData);    
864
865   //destroy array elements
866   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
867   int numGroups = CkpvAccess(_groupIDTable)->size();
868   for(int i=0;i<numGroups;i++) {
869     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
870     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
871     obj->flushStates();
872     obj->ckJustMigrated();
873   }
874   //restore array elements
875
876   int numElements;
877   p|numElements;
878
879   if(p.isUnpacking()){
880     for(int i=0;i<numElements;i++){
881       CkGroupID gID;
882       CkArrayIndex idx;
883       p|gID;
884       p|idx;
885       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
886       CmiAssert(mgr);
887       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
888     }
889     }
890     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
891     contribute(cb);
892   }
893
894   void CkMemCheckPT::notifyReplicaDie(int pe){
895     replicaAlive = 0;
896     CpvAccess(_remoteCrashedNode) = pe;
897   }
898
899   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
900   {
901 #if CMK_CHKP_ALL
902     int idx = 1;
903     if(msg->bud1 == CkMyPe()){
904       idx = 0;
905     }
906     int isChkpting = msg->cp_flag;
907     if(isChkpting == 1){
908       if(chkpTable[idx]) delete chkpTable[idx];
909     }
910     chkpTable[idx] = msg;
911     if(isChkpting){
912       recvCount++;
913       if(recvCount == 2){
914         if (where == CkCheckPoint_inMEM) {
915           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
916         }
917         else if (where == CkCheckPoint_inDISK) {
918           // another barrier for finalize the writing using fsync
919           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
920           contribute(0,NULL,CkReduction::sum_int,localcb);
921         }
922         else
923           CmiAbort("Unknown checkpoint scheme");
924         recvCount = 0;
925       }
926     }
927 #endif
928   }
929
930   // don't handle array elements
931   static inline void _handleProcData(PUP::er &p, CmiBool create)
932   {
933     // save readonlys, and callback BTW
934     CkPupROData(p);
935
936     // save mainchares 
937     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
938
939 #ifndef CMK_CHARE_USE_PTR
940     // save non-migratable chare
941     CkPupChareData(p);
942 #endif
943
944     // save groups into Groups.dat
945     CkPupGroupData(p,create);
946
947     // save nodegroups into NodeGroups.dat
948     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
949   }
950
951   void CkMemCheckPT::sendProcData()
952   {
953     // find out size of buffer
954     int size;
955     {
956       PUP::sizer p;
957       _handleProcData(p,CmiTrue);
958       size = p.size();
959     }
960     int packSize = size;
961     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
962     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
963     {
964       PUP::toMem p(msg->packData);
965       _handleProcData(p,CmiTrue);
966     }
967     msg->pe = CkMyPe();
968     msg->len = size;
969     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
970     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
971   }
972
973   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
974   {
975     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
976     CpvAccess(procChkptBuf) = msg;
977     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
978     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
979   }
980
981   // ArrayElement call this function to give us the checkpointed data
982   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
983   {
984     int len = ckTable.length();
985     int idx;
986     for (idx=0; idx<len; idx++) {
987       CkCheckPTInfo *entry = ckTable[idx];
988       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
989     }
990     CkAssert(idx < len);
991     int isChkpting = msg->cp_flag;
992     ckTable[idx]->updateBuffer(msg);
993     if (isChkpting) {
994       // all my array elements have returned their inmem data
995       // inform starter processor that I am done.
996       recvCount ++;
997       if (recvCount == ckTable.length()) {
998         if (where == CkCheckPoint_inMEM) {
999           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1000         }
1001         else if (where == CkCheckPoint_inDISK) {
1002           // another barrier for finalize the writing using fsync
1003           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1004           contribute(0,NULL,CkReduction::sum_int,localcb);
1005         }
1006         else
1007           CmiAbort("Unknown checkpoint scheme");
1008         recvCount = 0;
1009       } 
1010     }
1011   }
1012
1013   // only used in disk checkpointing
1014   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1015   {
1016     delete m;
1017 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1018     system("sync");
1019 #endif
1020     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1021   }
1022
1023   // only is called on cpStarter when checkpoint is done
1024   void CkMemCheckPT::cpFinish()
1025   {
1026     CmiAssert(CkMyPe() == cpStarter);
1027     peCount++;
1028     // now that all processors have finished, activate callback
1029     if (peCount == 2) 
1030     {
1031       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1032       peCount = 0;
1033       thisProxy.report();
1034     }
1035   }
1036
1037   // for debugging, report checkpoint info
1038   void CkMemCheckPT::report()
1039   {
1040     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1041     inCheckpointing = 0;
1042 #if !CMK_CHKP_ALL
1043     int objsize = 0;
1044     int len = ckTable.length();
1045     for (int i=0; i<len; i++) {
1046       CkCheckPTInfo *entry = ckTable[i];
1047       CmiAssert(entry);
1048       objsize += entry->getSize();
1049     }
1050     CmiAssert(CpvAccess(procChkptBuf));
1051     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1052 #else
1053     if(CkMyPe()==0)
1054       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1055 #endif
1056   }
1057
1058   /*****************************************************************************
1059     RESTART Procedure
1060    *****************************************************************************/
1061
1062   // master processor of two buddies
1063   inline int CkMemCheckPT::isMaster(int buddype)
1064   {
1065 #if 0
1066     int mype = CkMyPe();
1067     //CkPrintf("ismaster: %d %d\n", pe, mype);
1068     if (CkNumPes() - totalFailed() == 2) {
1069       return mype > buddype;
1070     }
1071     for (int i=1; i<CkNumPes(); i++) {
1072       int me = (buddype+i)%CkNumPes();
1073       if (isFailed(me)) continue;
1074       if (me == mype) return 1;
1075       else return 0;
1076     }
1077     return 0;
1078 #else
1079     // smaller one
1080     int mype = CkMyPe();
1081     //CkPrintf("ismaster: %d %d\n", pe, mype);
1082     if (CkNumPes() - totalFailed() == 2) {
1083       return mype < buddype;
1084     }
1085 #if NODE_CHECKPOINT
1086     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1087     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1088 #else
1089       for (int i=1; i<CkNumPes(); i++) {
1090 #endif
1091         int me = (mype+i)%CkNumPes();
1092         if (isFailed(me)) continue;
1093         if (me == buddype) return 1;
1094         else return 0;
1095       }
1096       return 0;
1097 #endif
1098     }
1099
1100
1101
1102 #if 0
1103     // helper class to pup all elements that belong to same ckLocMgr
1104     class ElementDestoryer : public CkLocIterator {
1105       private:
1106         CkLocMgr *locMgr;
1107       public:
1108         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1109         void addLocation(CkLocation &loc) {
1110           CkArrayIndex idx=loc.getIndex();
1111           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1112           loc.destroy();
1113         }
1114     };
1115 #endif
1116
1117     // restore the bitmap vector for LB
1118     void CkMemCheckPT::resetLB(int diepe)
1119     {
1120 #if CMK_LBDB_ON
1121       int i;
1122       char *bitmap = new char[CkNumPes()];
1123       // set processor available bitmap
1124       get_avail_vector(bitmap);
1125       for (i=0; i<failedPes.length(); i++)
1126         bitmap[failedPes[i]] = 0; 
1127       bitmap[diepe] = 0;
1128
1129 #if CK_NO_PROC_POOL
1130       set_avail_vector(bitmap);
1131 #endif
1132
1133       // if I am the crashed pe, rebuild my failedPEs array
1134       if (CkMyNode() == diepe)
1135         for (i=0; i<CkNumPes(); i++) 
1136           if (bitmap[i]==0) failed(i);
1137
1138       delete [] bitmap;
1139 #endif
1140     }
1141
1142     static double restartT;
1143
1144     // in case when failedPe dies, everybody go through its checkpoint table:
1145     // destory all array elements
1146     // recover lost buddies
1147     // reconstruct all array elements from check point data
1148     // called on all processors
1149     void CkMemCheckPT::restart(int diePe)
1150     {
1151 #if CMK_MEM_CHECKPOINT
1152       double curTime = CmiWallTimer();
1153       if (CkMyPe() == diePe){
1154         restartT = CmiWallTimer();
1155         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1156       }
1157       stage = (char*)"resetLB";
1158       startTime = curTime;
1159       if (CkMyPe() == diePe)
1160         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1161
1162 #if CK_NO_PROC_POOL
1163       failed(diePe);    // add into the list of failed pes
1164 #endif
1165       thisFailedPe = diePe;
1166
1167       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1168
1169       inRestarting = 1;
1170
1171       // disable load balancer's barrier
1172       //if (CkMyPe() != diePe) resetLB(diePe);
1173
1174       CKLOCMGR_LOOP(mgr->startInserting(););
1175
1176
1177       if(CmiNumPartition()==1){
1178         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1179       }else{
1180         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1181         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1182       }
1183 #endif
1184     }
1185
1186     // loally remove all array elements
1187     void CkMemCheckPT::removeArrayElements()
1188     {
1189 #if CMK_MEM_CHECKPOINT
1190       int len = ckTable.length();
1191       double curTime = CmiWallTimer();
1192       if (CkMyPe() == thisFailedPe) 
1193         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1194       stage = (char*)"removeArrayElements";
1195       startTime = curTime;
1196
1197       //  if (cpCallback.isInvalid()) 
1198       //          CkPrintf("invalid pe %d\n",CkMyPe());
1199       //          CkAbort("Didn't set restart callback\n");;
1200       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1201
1202       // get rid of all buffering and remote recs
1203       // including destorying all array elements
1204 #if CK_NO_PROC_POOL  
1205       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1206 #else
1207       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1208 #endif
1209       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1210 #endif
1211     }
1212
1213     // flush state in reduction manager
1214     void CkMemCheckPT::resetReductionMgr()
1215     {
1216       if (CkMyPe() == thisFailedPe) 
1217         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1218       int numGroups = CkpvAccess(_groupIDTable)->size();
1219       for(int i=0;i<numGroups;i++) {
1220         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1221         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1222         obj->flushStates();
1223         obj->ckJustMigrated();
1224       }
1225       // reset again
1226       //CpvAccess(_qd)->flushStates();
1227       if(CmiNumPartition()==1){
1228         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1229       }
1230       else
1231         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1232     }
1233
1234     // recover the lost buddies
1235     void CkMemCheckPT::recoverBuddies()
1236     {
1237       int idx;
1238       int len = ckTable.length();
1239       // ready to flush reduction manager
1240       // cannot be CkMemCheckPT::restart because destory will modify states
1241       double curTime = CmiWallTimer();
1242       if (CkMyPe() == thisFailedPe)
1243         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1244       stage = (char *)"recoverBuddies";
1245       if (CkMyPe() == thisFailedPe)
1246         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1247       startTime = curTime;
1248
1249       // recover buddies
1250       expectCount = 0;
1251 #if !CMK_CHKP_ALL
1252       for (idx=0; idx<len; idx++) {
1253         CkCheckPTInfo *entry = ckTable[idx];
1254         if (entry->pNo == thisFailedPe) {
1255 #if CK_NO_PROC_POOL
1256           // find a new buddy
1257           int budPe = BuddyPE(CkMyPe());
1258 #else
1259           int budPe = thisFailedPe;
1260 #endif
1261           CkArrayCheckPTMessage *msg = entry->getCopy();
1262           msg->bud1 = budPe;
1263           msg->bud2 = CkMyPe();
1264           msg->cp_flag = 0;            // not checkpointing
1265           thisProxy[budPe].recoverEntry(msg);
1266           expectCount ++;
1267         }
1268       }
1269 #else
1270       //send to failed pe
1271       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1272 #if CK_NO_PROC_POOL
1273         // find a new buddy
1274         int budPe = BuddyPE(CkMyPe());
1275 #else
1276         int budPe = thisFailedPe;
1277 #endif
1278         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1279         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1280         msg->cp_flag = 0;            // not checkpointing
1281         msg->bud1 = budPe;
1282         msg->bud2 = CkMyPe();
1283         thisProxy[budPe].recoverEntry(msg);
1284         expectCount ++;
1285       }
1286 #endif
1287
1288       if (expectCount == 0) {
1289         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1290       }
1291       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1292     }
1293
1294     void CkMemCheckPT::gotData()
1295     {
1296       ackCount ++;
1297       if (ackCount == expectCount) {
1298         ackCount = 0;
1299         expectCount = -1;
1300         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1301         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1302       }
1303     }
1304
1305     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1306     {
1307
1308       for (int i=0; i<n; i++) {
1309         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1310         mgr->updateLocation(idx[i], nowOnPe);
1311       }
1312       thisProxy[nowOnPe].gotReply();
1313     }
1314
1315     // restore array elements
1316     void CkMemCheckPT::recoverArrayElements()
1317     {
1318       double curTime = CmiWallTimer();
1319       int len = ckTable.length();
1320       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1321       stage = (char *)"recoverArrayElements";
1322       if (CkMyPe() == thisFailedPe)
1323         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1324       startTime = curTime;
1325       int flag = 0;
1326       // recover all array elements
1327       int count = 0;
1328
1329 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1330       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1331       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1332 #endif
1333
1334 #if !CMK_CHKP_ALL
1335       for (int idx=0; idx<len; idx++)
1336       {
1337         CkCheckPTInfo *entry = ckTable[idx];
1338 #if CK_NO_PROC_POOL
1339         // the bigger one will do 
1340         //    if (CkMyPe() < entry->pNo) continue;
1341         if (!isMaster(entry->pNo)) continue;
1342 #else
1343         // smaller one do it, which has the original object
1344         if (CkMyPe() == entry->pNo+1 || 
1345             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1346 #endif
1347         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1348
1349         entry->updateBuddy(CkMyPe(), entry->pNo);
1350         CkArrayCheckPTMessage *msg = entry->getCopy();
1351         // gzheng
1352         //thisProxy[CkMyPe()].inmem_restore(msg);
1353         inmem_restore(msg);
1354 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1355         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1356         int homePe = mgr->homePe(msg->index);
1357         if (homePe != CkMyPe()) {
1358           gmap[homePe].push_back(msg->locMgr);
1359           imap[homePe].push_back(msg->index);
1360         }
1361 #endif
1362         CkFreeMsg(msg);
1363         count ++;
1364       }
1365 #else
1366       char * packData;
1367       if(CmiNumPartition()==1){
1368         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1369         packData = (char *)msg->packData;
1370       }
1371       else{
1372         int pointer = CpvAccess(curPointer);
1373         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1374         packData = msg->packData;
1375       }
1376 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1377       recoverAll(packData,gmap,imap);
1378 #else
1379       recoverAll(packData);
1380 #endif
1381 #endif
1382 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1383       for (int i=0; i<CkNumPes(); i++) {
1384         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1385           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1386           flag++;       
1387         }
1388       }
1389       delete [] imap;
1390       delete [] gmap;
1391 #endif
1392       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1393
1394       CKLOCMGR_LOOP(mgr->doneInserting(););
1395
1396       // _crashedNode = -1;
1397       CpvAccess(_crashedNode) = -1;
1398 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1399       if (CkMyPe() == 0)
1400         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1401 #else
1402       if(flag == 0)
1403       {
1404         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1405       }
1406 #endif
1407     }
1408
1409     void CkMemCheckPT::gotReply(){
1410       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1411     }
1412
1413     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1414 #if CMK_CHKP_ALL
1415       PUP::fromMem p(packData);
1416       int numElements;
1417       p|numElements;
1418       if(p.isUnpacking()){
1419         for(int i=0;i<numElements;i++){
1420           CkGroupID gID;
1421           CkArrayIndex idx;
1422           p|gID;
1423           p|idx;
1424           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1425           int homePe = mgr->homePe(idx);
1426 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1427           mgr->resume(idx,p,CmiTrue,CmiTrue);
1428 #else
1429           if(CmiNumPartition()==1)
1430             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1431           else{
1432             if(CkMyPe()==thisFailedPe){
1433               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1434             }
1435             else{
1436               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1437             }
1438           }
1439 #endif
1440 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1441           homePe = mgr->homePe(idx);
1442           if (homePe != CkMyPe()) {
1443             gmap[homePe].push_back(gID);
1444             imap[homePe].push_back(idx);
1445           }
1446 #endif
1447         }
1448       }
1449 #endif
1450     }
1451
1452
1453     // on every processor
1454     // turn load balancer back on
1455     void CkMemCheckPT::finishUp()
1456     {
1457       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1458       //CKLOCMGR_LOOP(mgr->doneInserting(););
1459
1460       if (CkMyPe() == thisFailedPe)
1461       {
1462         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1463         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1464         fflush(stdout);
1465       }
1466       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1467       inRestarting = 0;
1468
1469 #if CMK_CONVERSE_MPI    
1470       if(CmiNumPartition()!=1){
1471         CpvAccess(recvdProcChkp) = 0;
1472         CpvAccess(recvdArrayChkp) = 0;
1473         CpvAccess(curPointer)^=1;
1474         //notify my replica, restart is done
1475         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1476         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1477         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1478       }
1479       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1480         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1481       }
1482 #endif
1483
1484 #if CK_NO_PROC_POOL
1485 #if NODE_CHECKPOINT
1486       int numnodes = CmiNumPhysicalNodes();
1487 #else
1488       int numnodes = CkNumPes();
1489 #endif
1490       if (numnodes-totalFailed() <=2) {
1491         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1492         _memChkptOn = 0;
1493       }
1494 #endif
1495     }
1496
1497     void CkMemCheckPT::recoverFromSoftFailure()
1498     {
1499       inRestarting = 0;
1500       CpvAccess(recvdRemote) = 0;
1501       CpvAccess(recvdLocal) = 0;
1502       CpvAccess(localChkpDone) = 0;
1503       CpvAccess(remoteChkpDone) = 0;
1504       inCheckpointing = 0;
1505       notifyReplica = 0;
1506       if(CkMyPe() == 0){
1507         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1508       }
1509       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1510     }
1511     // called only on 0
1512     void CkMemCheckPT::quiescence(CkCallback &cb)
1513     {
1514       static int pe_count = 0;
1515       pe_count ++;
1516       CmiAssert(CkMyPe() == 0);
1517       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1518       if (pe_count == CkNumPes()) {
1519         pe_count = 0;
1520         cb.send();
1521       }
1522     }
1523
1524     // User callable function - to start a checkpoint
1525     // callback cb is used to pass control back
1526     void CkStartMemCheckpoint(CkCallback &cb)
1527     {
1528 #if CMK_MEM_CHECKPOINT
1529       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1530       /*if (_memChkptOn == 0) {
1531         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1532         cb.send();
1533         return;
1534         }*/
1535       if (CkInRestarting()) {
1536         // trying to checkpointing during restart
1537         cb.send();
1538         return;
1539       }
1540       // store user callback and user data
1541       CkMemCheckPT::cpCallback = cb;
1542
1543       // broadcast to start check pointing
1544       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1545       checkptMgr.doItNow(CkMyPe(), cb);
1546 #else
1547       // when mem checkpoint is disabled, invike cb immediately
1548       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1549       cb.send();
1550 #endif
1551     }
1552
1553     void CkRestartCheckPoint(int diePe)
1554     {
1555       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1556       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1557       // broadcast
1558       checkptMgr.restart(diePe);
1559     }
1560
1561     static int _diePE = -1;
1562
1563     // callback function used locally by ccs handler
1564     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1565     {
1566       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1567       CkRestartCheckPoint(_diePE);
1568     }
1569
1570
1571     // called on crashed PE
1572     static void restartBeginHandler(char *msg)
1573     {
1574       CmiFree(msg);
1575 #if CMK_MEM_CHECKPOINT
1576 #if CMK_USE_BARRIER
1577       if(CkMyPe()!=_diePE){
1578         printf("restar begin on %d\n",CkMyPe());
1579         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1580         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1581         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1582       }else{
1583         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1584         CkRestartCheckPointCallback(NULL, NULL);
1585       }
1586 #else
1587       static int count = 0;
1588       CmiAssert(CkMyPe() == _diePE);
1589       count ++;
1590       if (count == CkNumPes()) {
1591         printf("restart begin on %d\n",CkMyPe());
1592         CkRestartCheckPointCallback(NULL, NULL);
1593         count = 0;
1594       }
1595 #endif
1596 #endif
1597     }
1598
1599     extern void _discard_charm_message();
1600     extern void _resume_charm_message();
1601
1602     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1603       return data;
1604     }
1605
1606     static void restartBcastHandler(char *msg)
1607     {
1608 #if CMK_MEM_CHECKPOINT
1609       // advance phase counter
1610       CkMemCheckPT::inRestarting = 1;
1611       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1612       // gzheng
1613       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1614
1615       if (CkMyPe()==_diePE)
1616         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1617
1618       // reset QD counters
1619       /*  gzheng
1620           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1621        */
1622
1623       /*  gzheng
1624           if (CkMyPe()==_diePE)
1625           CkRestartCheckPointCallback(NULL, NULL);
1626        */
1627       CmiFree(msg);
1628
1629       _resume_charm_message();
1630
1631       // reduction
1632       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1633       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1634 #if CMK_USE_BARRIER
1635       //CmiPrintf("before reduce\n");   
1636       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1637       //CmiPrintf("after reduce\n");    
1638 #else
1639       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1640 #endif 
1641       checkpointed = 0;
1642 #endif
1643     }
1644
1645     extern void _initDone();
1646
1647     bool compare(char * buf1, char *buf2){
1648       if(CkMyPe()==0)
1649         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1650       PUP::checker pchecker(buf1,buf2);
1651       pchecker.skip();
1652       int numElements;
1653       pchecker|numElements;
1654       for(int i=0;i<numElements;i++){
1655         CkGroupID gID;
1656         CkArrayIndex idx;
1657
1658         pchecker|gID;
1659         pchecker|idx;
1660
1661         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1662         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1663       }
1664       if(CkMyPe()==0)
1665         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1666       //int fault_num = pchecker.getFaultNum();
1667       bool result = pchecker.getResult();
1668       /*if(!result){
1669         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1670       }*/
1671       return result;
1672     }
1673     int getChecksum(char * buf){
1674       PUP::checker pchecker(buf);
1675       pchecker.skip();
1676       int numElements;
1677       pchecker|numElements;
1678 //      CkPrintf("num %d\n",numElements);
1679       for(int i=0;i<numElements;i++){
1680         CkGroupID gID;
1681         CkArrayIndex idx;
1682
1683         pchecker|gID;
1684         pchecker|idx;
1685
1686         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1687         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1688       }
1689       return pchecker.getChecksum();
1690     }
1691
1692     static void recvRemoteChkpHandler(char *msg){
1693 //#if CMK_USE_CHECKSUM
1694       if(CpvAccess(use_checksum)){
1695         if(CkMyPe()==0)
1696           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1697         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1698         CpvAccess(recvdRemote) = 1;
1699         if(CpvAccess(recvdLocal)==1){
1700           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1701             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1702           }
1703           else{
1704             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1705           }
1706         }
1707       }else{
1708 //#else
1709         envelope *env = (envelope *)msg;
1710         CkUnpackMessage(&env);
1711         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1712         if(CpvAccess(recvdLocal)==1){
1713           int pointer = CpvAccess(curPointer);
1714           int size = CpvAccess(chkpBuf)[pointer]->len;
1715           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1716             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1717           }else
1718           {
1719             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1720           }
1721           delete chkpMsg;
1722           if(CkMyPe()==0)
1723             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1724         }else{
1725           CpvAccess(recvdRemote) = 1;
1726           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1727           CpvAccess(buddyBuf) = chkpMsg;
1728         }
1729       }
1730 //#endif
1731     }
1732
1733     static void replicaRecoverHandler(char *msg){
1734       CpvAccess(_remoteCrashedNode) = -1;
1735       CkMemCheckPT::replicaAlive = 1;
1736       //fflush(stdout);
1737       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1738       bool ret = true;
1739       CpvAccess(remoteChkpDone) = 1;
1740       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1741       CmiFree(msg);
1742
1743     }
1744     static void replicaChkpDoneHandler(char *msg){
1745       CpvAccess(remoteChkpDone) = 1;
1746       if(CpvAccess(localChkpDone) == 1)
1747         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1748       CmiFree(msg);
1749     }
1750
1751     static void replicaDieHandler(char * msg){
1752 #if CMK_CONVERSE_MPI    
1753       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1754       CpvAccess(_remoteCrashedNode) = diePe;
1755       CkMemCheckPT::replicaAlive = 0;
1756       if(CkMyPe()==diePe){
1757         CmiPrintf("pe %d in replicad word die\n",diePe);
1758         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1759         fflush(stdout);
1760       }
1761       find_spare_mpirank(diePe,CmiMyPartition()^1);
1762 #endif
1763       //broadcast to my partition to get local max iter
1764       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1765       CmiFree(msg);
1766     }
1767
1768
1769     static void replicaDieBcastHandler(char *msg){
1770       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1771       CpvAccess(_remoteCrashedNode) = diePe;
1772       CkMemCheckPT::replicaAlive = 0;
1773       CmiFree(msg);
1774     }
1775
1776     static void recoverRemoteProcDataHandler(char *msg){
1777       envelope *env = (envelope *)msg;
1778       CkUnpackMessage(&env);
1779       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1780
1781       //store the checkpoint
1782       int pointer = procMsg->pointer;
1783
1784
1785       if(CkMyPe()==CpvAccess(_crashedNode)){
1786         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1787         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1788         PUP::fromMem p(procMsg->packData);
1789         _handleProcData(p,CmiTrue);
1790         _initDone();
1791       }
1792       else{
1793         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1794         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1795         //_handleProcData(p,CmiFalse);
1796       }
1797       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1798       CKLOCMGR_LOOP(mgr->startInserting(););
1799
1800       CpvAccess(recvdProcChkp) =1;
1801       if(CpvAccess(recvdArrayChkp)==1){
1802         _resume_charm_message();
1803         _diePE = CpvAccess(_crashedNode);
1804         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1805         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1806         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1807       }
1808     }
1809
1810     static void recoverRemoteArrayDataHandler(char *msg){
1811       envelope *env = (envelope *)msg;
1812       CkUnpackMessage(&env);
1813       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1814
1815       //store the checkpoint
1816       int pointer = chkpMsg->pointer;
1817       CpvAccess(curPointer) = pointer;
1818       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1819       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1820       CpvAccess(recvdArrayChkp) =1;
1821       CkMemCheckPT::inRestarting = 1;
1822       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1823         _resume_charm_message();
1824         _diePE = CpvAccess(_crashedNode);
1825         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1826         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1827         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1828         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1829       }
1830     }
1831
1832     static void recvPhaseHandler(char * msg)
1833     {
1834       CpvAccess(_curRestartPhase)--;
1835       CkMemCheckPT::inRestarting = 1;
1836       CmiFree(msg);
1837     }
1838     // called on crashed processor
1839     static void recoverProcDataHandler(char *msg)
1840     {
1841 #if CMK_MEM_CHECKPOINT
1842       int i;
1843       envelope *env = (envelope *)msg;
1844       CkUnpackMessage(&env);
1845       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1846       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1847       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1848       PUP::fromMem p(procMsg->packData);
1849       _handleProcData(p,CmiTrue);
1850
1851       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1852       // gzheng
1853       CKLOCMGR_LOOP(mgr->startInserting(););
1854
1855       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1856       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1857       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1858       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1859
1860       _initDone();
1861       //   CpvAccess(_qd)->flushStates();
1862       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1863 #endif
1864     }
1865     //for replica, got the phase number from my neighbor processor in the current partition
1866     static void askPhaseHandler(char *msg)
1867     {
1868 #if CMK_MEM_CHECKPOINT
1869       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1870       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1871       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1872       CmiSetHandler(msg, recvPhaseHandlerIdx);
1873       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1874 #endif
1875     }
1876     // called on its backup processor
1877     // get backup message buffer and sent to crashed processor
1878     static void askProcDataHandler(char *msg)
1879     {
1880 #if CMK_MEM_CHECKPOINT
1881       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1882       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1883       if (CpvAccess(procChkptBuf) == NULL)  {
1884         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1885         CkAbort("no checkpoint found");
1886       }
1887       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1888       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1889
1890       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1891
1892       CkPackMessage(&env);
1893       CmiSetHandler(env, recoverProcDataHandlerIdx);
1894       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1895       CpvAccess(procChkptBuf) = NULL;
1896       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1897 #endif
1898     }
1899
1900     // called on PE 0
1901     void qd_callback(void *m)
1902     {
1903       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1904       fflush(stdout);
1905       CkFreeMsg(m);
1906       if(CmiNumPartition()==1){
1907 #ifdef CMK_SMP
1908         for(int i=0;i<CmiMyNodeSize();i++){
1909           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1910           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1911           CmiSetHandler(msg, askProcDataHandlerIdx);
1912           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1913           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1914         }
1915         return;
1916 #endif
1917         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1918         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1919         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1920         CmiSetHandler(msg, askProcDataHandlerIdx);
1921         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1922         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1923       }
1924       else{
1925         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1926         CmiSetHandler(msg, recvPhaseHandlerIdx);
1927         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1928       }
1929     }
1930
1931     // on crashed node
1932     void CkMemRestart(const char *dummy, CkArgMsg *args)
1933     {
1934 #if CMK_MEM_CHECKPOINT
1935       _diePE = CmiMyNode();
1936       CpvAccess( _crashedNode )= CmiMyNode();
1937       CkMemCheckPT::inRestarting = 1;
1938       _discard_charm_message();
1939
1940       /*if(CmiMyRank()==0){
1941         CkCallback cb(qd_callback);
1942         CkStartQD(cb);
1943         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1944         }*/
1945       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1946       if(CmiNumPartition()==1){
1947         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1948         restartT = CmiWallTimer();
1949         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1950         fflush(stdout);
1951         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1952         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1953         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1954         CmiSetHandler(msg, askProcDataHandlerIdx);
1955         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1956         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1957       }
1958       else{
1959         CkCallback cb(qd_callback);
1960         CkStartQD(cb);
1961       }
1962 #else
1963       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1964 #endif
1965     }
1966
1967     // can be called in other files
1968     // return true if it is in restarting
1969     extern "C"
1970       int CkInRestarting()
1971       {
1972 #if CMK_MEM_CHECKPOINT
1973         if (CpvAccess( _crashedNode)!=-1) return 1;
1974         // gzheng
1975         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1976         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1977         return CkMemCheckPT::inRestarting;
1978 #else
1979         return 0;
1980 #endif
1981       }
1982
1983     extern "C"
1984       int CkReplicaAlive()
1985       {
1986         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1987         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1988         return CkMemCheckPT::replicaAlive;
1989
1990         /*if(CkMemCheckPT::replicaDead==1)
1991           return 0;
1992           else          
1993           return 1;*/
1994       }
1995
1996     extern "C"
1997       int CkInCheckpointing()
1998       {
1999         return CkMemCheckPT::inCheckpointing;
2000       }
2001
2002     extern "C"
2003       void CkSetInLdb(){
2004 #if CMK_MEM_CHECKPOINT
2005         CkMemCheckPT::inLoadbalancing = 1;
2006 #endif
2007       }
2008
2009     extern "C"
2010       int CkInLdb(){
2011 #if CMK_MEM_CHECKPOINT
2012         return CkMemCheckPT::inLoadbalancing;
2013 #endif
2014         return 0;
2015       }
2016
2017     extern "C"
2018       void CkResetInLdb(){
2019 #if CMK_MEM_CHECKPOINT
2020         CkMemCheckPT::inLoadbalancing = 0;
2021 #endif
2022       }
2023
2024     /*****************************************************************************
2025       module initialization
2026      *****************************************************************************/
2027
2028     static int arg_where = CkCheckPoint_inMEM;
2029
2030 #if CMK_MEM_CHECKPOINT
2031     void init_memcheckpt(char **argv)
2032     {
2033       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2034         arg_where = CkCheckPoint_inDISK;
2035       }
2036       CpvInitialize(int, use_checksum);
2037       CpvAccess(use_checksum)=0;
2038       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2039         CpvAccess(use_checksum)=1;
2040       }
2041       // initiliazing _crashedNode variable
2042       CpvInitialize(int, _crashedNode);
2043       CpvInitialize(int, _remoteCrashedNode);
2044       CpvAccess(_crashedNode) = -1;
2045       CpvAccess(_remoteCrashedNode) = -1;
2046       init_FI(argv);
2047     }
2048 #endif
2049
2050     class CkMemCheckPTInit: public Chare {
2051       public:
2052         CkMemCheckPTInit(CkArgMsg *m) {
2053 #if CMK_MEM_CHECKPOINT
2054           if (arg_where == CkCheckPoint_inDISK) {
2055             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2056           }
2057           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2058           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2059 #endif
2060         }
2061     };
2062
2063     static void notifyHandler(char *msg)
2064     {
2065 #if CMK_MEM_CHECKPOINT
2066       CmiFree(msg);
2067       /* immediately increase restart phase to filter old messages */
2068       CpvAccess(_curRestartPhase) ++;
2069       CpvAccess(_qd)->flushStates();
2070       _discard_charm_message();
2071
2072 #endif
2073     }
2074
2075     extern "C"
2076       void notify_crash(int node)
2077       {
2078 #ifdef CMK_MEM_CHECKPOINT
2079         CpvAccess( _crashedNode) = node;
2080 #ifdef CMK_SMP
2081         for(int i=0;i<CkMyNodeSize();i++){
2082           CpvAccessOther(_crashedNode,i)=node;
2083         }
2084 #endif
2085         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2086         CkMemCheckPT::inRestarting = 1;
2087
2088         // this may be in interrupt handler, send a message to reset QD
2089         int pe = CmiNodeFirst(CkMyNode());
2090         for(int i=0;i<CkMyNodeSize();i++){
2091           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2092           CmiSetHandler(msg, notifyHandlerIdx);
2093           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2094         }
2095 #endif
2096       }
2097
2098     extern "C" void (*notify_crash_fn)(int node);
2099
2100
2101 #if CMK_CONVERSE_MPI
2102     void buddyDieHandler(char *msg)
2103     {
2104 #if CMK_MEM_CHECKPOINT
2105       // notify
2106       CkMemCheckPT::inRestarting = 1;
2107       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2108       notify_crash(diepe);
2109       // send message to crash pe to let it restart
2110       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2111       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2112       int buddy = obj->BuddyPE(CmiMyPe());
2113       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2114       if (buddy == diepe)  {
2115         mpi_restart_crashed(diepe, newrank);
2116       }
2117 #endif
2118     }
2119
2120     void pingHandler(void *msg)
2121     {
2122       lastPingTime = CmiWallTimer();
2123       CmiFree(msg);
2124     }
2125
2126     void pingCheckHandler()
2127     {
2128 #if CMK_MEM_CHECKPOINT
2129       double now = CmiWallTimer();
2130       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2131         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2132         int i, pe, buddy;
2133         // tell everyone the buddy dies
2134         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2135         for (i = 1; i < CmiNumPes(); i++) {
2136           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2137           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2138         }
2139         buddy = pe;
2140         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2141         fflush(stdout);
2142         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2143         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2144         CmiSetHandler(msg, buddyDieHandlerIdx);
2145         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2146         //send to everyone in the other world
2147         if(CmiNumPartition()!=1){
2148           for(int i=0;i<CmiNumPes();i++){
2149             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2150             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2151             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2152             //fflush(stdout);
2153             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2154             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2155           }
2156         }
2157       }
2158         else 
2159           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2160 #endif
2161       }
2162
2163       void pingBuddy()
2164       {
2165 #if CMK_MEM_CHECKPOINT
2166         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2167         if (obj) {
2168           int buddy = obj->BuddyPE(CkMyPe());
2169           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2170           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2171           CmiSetHandler(msg, pingHandlerIdx);
2172           CmiGetRestartPhase(msg) = 9999;
2173           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2174         }
2175         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2176 #endif
2177       }
2178 #endif
2179
2180       // initproc
2181       void CkRegisterRestartHandler( )
2182       {
2183 #if CMK_MEM_CHECKPOINT
2184         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2185         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2186         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2187         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2188         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2189
2190         //for replica
2191         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2192         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2193         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2194         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2195         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2196         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2197         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2198         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2199         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2200
2201 #if CMK_CONVERSE_MPI
2202         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2203         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2204         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2205 #endif
2206
2207         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2208         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2209         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2210         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2211         CpvInitialize(int,curPointer);
2212         CpvInitialize(int,recvdLocal);
2213         CpvInitialize(int,localChkpDone);
2214         CpvInitialize(int,remoteChkpDone);
2215         CpvInitialize(int,recvdRemote);
2216         CpvInitialize(int,recvdProcChkp);
2217         CpvInitialize(int,localChecksum);
2218         CpvInitialize(int,remoteChecksum);
2219         CpvInitialize(int,recvdArrayChkp);
2220
2221         CpvAccess(procChkptBuf) = NULL;
2222         CpvAccess(buddyBuf) = NULL;
2223         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2224         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2225         CpvAccess(chkpBuf)[0] = NULL;
2226         CpvAccess(chkpBuf)[1] = NULL;
2227         CpvAccess(localProcChkpBuf)[0] = NULL;
2228         CpvAccess(localProcChkpBuf)[1] = NULL;
2229
2230         CpvAccess(curPointer) = 0;
2231         CpvAccess(recvdLocal) = 0;
2232         CpvAccess(localChkpDone) = 0;
2233         CpvAccess(remoteChkpDone) = 0;
2234         CpvAccess(recvdRemote) = 0;
2235         CpvAccess(recvdProcChkp) = 0;
2236         CpvAccess(localChecksum) = 0;
2237         CpvAccess(remoteChecksum) = 0;
2238         CpvAccess(recvdArrayChkp) = 0;
2239
2240         notify_crash_fn = notify_crash;
2241
2242 #if ! CMK_CONVERSE_MPI
2243         // print pid to kill
2244         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2245         //  sleep(4);
2246 #endif
2247 #endif
2248       }
2249
2250
2251       extern "C"
2252         int CkHasCheckpoints()
2253         {
2254           return checkpointed;
2255         }
2256
2257       /// @todo: the following definitions should be moved to a separate file containing
2258       // structures and functions about fault tolerance strategies
2259
2260       /**
2261        *  * @brief: function for killing a process                                             
2262        *   */
2263 #ifdef CMK_MEM_CHECKPOINT
2264 #if CMK_HAS_GETPID
2265       void killLocal(void *_dummy,double curWallTime){
2266         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2267         if(CmiWallTimer()<killTime-1){
2268           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2269         }else{ 
2270 #if CMK_CONVERSE_MPI
2271           CkDieNow();
2272 #else 
2273           kill(getpid(),SIGKILL);                                               
2274 #endif
2275         }              
2276       } 
2277 #else
2278       void killLocal(void *_dummy,double curWallTime){
2279         CmiAbort("kill() not supported!");
2280       }
2281 #endif
2282 #endif
2283
2284 #ifdef CMK_MEM_CHECKPOINT
2285       /**
2286        * @brief: reads the file with the kill information
2287        */
2288       void readKillFile(){
2289         FILE *fp=fopen(killFile,"r");
2290         if(!fp){
2291           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2292           return;
2293         }
2294         int proc;
2295         double sec;
2296         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2297           if(proc == CkMyNode() && CkMyRank() == 0){
2298             killTime = CmiWallTimer()+sec;
2299             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2300             CcdCallFnAfter(killLocal,NULL,sec*1000);
2301           }
2302         }
2303         fclose(fp);
2304       }
2305
2306 #if ! CMK_CONVERSE_MPI
2307       void CkDieNow()
2308       {
2309 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2310         // ignored for non-mpi version
2311         CmiPrintf("[%d] die now.\n", CmiMyPe());
2312         killTime = CmiWallTimer()+0.001;
2313         CcdCallFnAfter(killLocal,NULL,1);
2314 #endif
2315       }
2316 #endif
2317
2318 #endif
2319
2320 #include "CkMemCheckpoint.def.h"
2321
2322