checksum support
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, _remoteCrashedNode);
77
78 // static, so that it is accessible from Converse part
79 int CkMemCheckPT::inRestarting = 0;
80 int CkMemCheckPT::inCheckpointing = 0;
81 int CkMemCheckPT::replicaAlive = 1;
82 int CkMemCheckPT::inLoadbalancing = 0;
83 double CkMemCheckPT::startTime;
84 char *CkMemCheckPT::stage;
85 CkCallback CkMemCheckPT::cpCallback;
86
87 int _memChkptOn = 1;                    // checkpoint is on or off
88
89 CkGroupID ckCheckPTGroupID;             // readonly
90
91 static int checkpointed = 0;
92
93 /// @todo the following declarations should be moved into a separate file for all 
94 // fault tolerant strategies
95
96 #ifdef CMK_MEM_CHECKPOINT
97 // name of the kill file that contains processes to be killed 
98 char *killFile;                                               
99 // flag for the kill file         
100 int killFlag=0;
101 // variable for storing the killing time
102 double killTime=0.0;
103 #endif
104
105 #ifdef CKLOCMGR_LOOP
106 #undef CKLOCMGR_LOOP
107 #endif
108 // loop over all CkLocMgr and do "code"
109 #define  CKLOCMGR_LOOP(code)    {       \
110   int numGroups = CkpvAccess(_groupIDTable)->size();    \
111   for(int i=0;i<numGroups;i++) {        \
112     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
113     if(obj->isLocMgr())  {      \
114       CkLocMgr *mgr = (CkLocMgr*)obj;   \
115       code      \
116     }   \
117   }     \
118 }
119
120 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
121 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
122 CpvDeclare(CkCheckPTMessage**, chkpBuf);
123 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
124 //store the checkpoint of the buddy to compare
125 //do not need the whole msg, can be the checksum
126 CpvDeclare(CkCheckPTMessage*, buddyBuf);
127 //pointer of the checkpoint going to be written
128 CpvDeclare(int, curPointer);
129 CpvDeclare(int, recvdRemote);
130 CpvDeclare(int, recvdLocal);
131 CpvDeclare(int, localChkpDone);
132 CpvDeclare(int, remoteChkpDone);
133 CpvDeclare(int, recvdArrayChkp);
134 CpvDeclare(int, recvdProcChkp);
135 CpvDeclare(int, localChecksum);
136 CpvDeclare(int, remoteChecksum);
137
138 bool compare(char * buf1, char * buf2);
139 int getChecksum(char * buf);
140 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
141 // Converse function handles
142 static int askPhaseHandlerIdx;
143 static int recvPhaseHandlerIdx;
144 static int askProcDataHandlerIdx;
145 static int restartBcastHandlerIdx;
146 static int recoverProcDataHandlerIdx;
147 static int restartBeginHandlerIdx;
148 static int recvRemoteChkpHandlerIdx;
149 static int replicaDieHandlerIdx;
150 static int replicaDieBcastHandlerIdx;
151 static int replicaRecoverHandlerIdx;
152 static int replicaChkpDoneHandlerIdx;
153 static int recoverRemoteProcDataHandlerIdx;
154 static int recoverRemoteArrayDataHandlerIdx;
155 static int notifyHandlerIdx;
156 // compute the backup processor
157 // FIXME: avoid crashed processors
158 #if CMK_CONVERSE_MPI
159 static int pingHandlerIdx;
160 static int pingCheckHandlerIdx;
161 static int buddyDieHandlerIdx;
162 static double lastPingTime = -1;
163
164 extern "C" void mpi_restart_crashed(int pe, int rank);
165 extern "C" int  find_spare_mpirank(int pe,int partition);
166
167 void pingBuddy();
168 void pingCheckHandler();
169
170 #endif
171 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
172
173 inline int CkMemCheckPT::BuddyPE(int pe)
174 {
175   int budpe;
176 #if NODE_CHECKPOINT
177   // buddy is the processor with same rank on the next physical node
178   int r1 = CmiPhysicalRank(pe);
179   int budnode = CmiPhysicalNodeID(pe);
180   do {
181     budnode = (budnode+1)%CmiNumPhysicalNodes();
182     int *pelist;
183     int num;
184     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
185     budpe = pelist[r1 % num];
186   } while (isFailed(budpe));
187   if (budpe == pe) {
188     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
189     CmiAbort("Failed to find a buddy processor");
190   }
191 #else
192   budpe = pe;
193   budpe = (budpe+1)%CkNumPes();
194 #endif
195   return budpe;
196 }
197
198 // called in array element constructor
199 // choose and register with 2 buddies for checkpoiting 
200 #if CMK_MEM_CHECKPOINT
201 void ArrayElement::init_checkpt() {
202   if (_memChkptOn == 0) return;
203   if (CkInRestarting()) {
204     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
205   }
206   // only master init checkpoint
207   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
208
209   budPEs[0] = CkMyPe();
210   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
211   CmiAssert(budPEs[0] != budPEs[1]);
212   // inform checkPTMgr
213   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
214   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
215   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
216   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
217 }
218 #endif
219
220 // entry function invoked by checkpoint mgr asking for checkpoint data
221 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
222 #if CMK_MEM_CHECKPOINT
223   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
224   //char index[128];   thisIndexMax.sprint(index);
225   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
226   CkLocMgr *locMgr = thisArray->getLocMgr();
227   CmiAssert(myRec!=NULL);
228   int size;
229   {
230     PUP::sizer p;
231     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
232     size = p.size();
233   }
234   int packSize = size/sizeof(double) +1;
235   CkArrayCheckPTMessage *msg =
236     new (packSize, 0) CkArrayCheckPTMessage;
237   msg->len = size;
238   msg->index =thisIndexMax;
239   msg->aid = thisArrayID;
240   msg->locMgr = locMgr->getGroupID();
241   msg->cp_flag = 1;
242   {
243     PUP::toMem p(msg->packData);
244     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
245   }
246
247   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
248   checkptMgr.recvData(msg, 2, budPEs);
249   delete m;
250 #endif
251 }
252
253 // checkpoint holder class - for memory checkpointing
254 class CkMemCheckPTInfo: public CkCheckPTInfo
255 {
256   CkArrayCheckPTMessage *ckBuffer;
257   public:
258   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
259     CkCheckPTInfo(a, loc, idx, pno)
260   {
261     ckBuffer = NULL;
262   }
263   ~CkMemCheckPTInfo() 
264   {
265     if (ckBuffer) delete ckBuffer; 
266   }
267   inline void updateBuffer(CkArrayCheckPTMessage *data) 
268   {
269     CmiAssert(data!=NULL);
270     if (ckBuffer) delete ckBuffer;
271     ckBuffer = data;
272   }    
273   inline CkArrayCheckPTMessage * getCopy()
274   {
275     if (ckBuffer == NULL) {
276       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
277       CmiAbort("Abort!");
278     }
279     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
280   }     
281   inline void updateBuddy(int b1, int b2) {
282     CmiAssert(ckBuffer);
283     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
284     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
285     CmiAssert(pNo != CkMyPe());
286   }
287   inline int getSize() { 
288     CmiAssert(ckBuffer);
289     return ckBuffer->len; 
290   }
291 };
292
293 // checkpoint holder class - for in-disk checkpointing
294 class CkDiskCheckPTInfo: public CkCheckPTInfo 
295 {
296   char *fname;
297   int bud1, bud2;
298   int len;                      // checkpoint size
299   public:
300   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
301   {
302 #if CMK_USE_MKSTEMP
303     fname = new char[64];
304     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
305     mkstemp(fname);
306 #else
307     fname=tmpnam(NULL);
308 #endif
309     bud1 = bud2 = -1;
310     len = 0;
311   }
312   ~CkDiskCheckPTInfo() 
313   {
314     remove(fname);
315   }
316   inline void updateBuffer(CkArrayCheckPTMessage *data) 
317   {
318     double t = CmiWallTimer();
319     // unpack it
320     envelope *env = UsrToEnv(data);
321     CkUnpackMessage(&env);
322     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
323     FILE *f = fopen(fname,"wb");
324     PUP::toDisk p(f);
325     CkPupMessage(p, (void **)&data);
326     // delay sync to the end because otherwise the messages are blocked
327     //    fsync(fileno(f));
328     fclose(f);
329     bud1 = data->bud1;
330     bud2 = data->bud2;
331     len = data->len;
332     delete data;
333     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
334   }
335   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
336   {
337     CkArrayCheckPTMessage *data;
338     FILE *f = fopen(fname,"rb");
339     PUP::fromDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     fclose(f);
342     data->bud1 = bud1;                          // update the buddies
343     data->bud2 = bud2;
344     return data;
345   }
346   inline void updateBuddy(int b1, int b2) {
347     bud1 = b1; bud2 = b2;
348     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
349     CmiAssert(pNo != CkMyPe());
350   }
351   inline int getSize() { 
352     return len; 
353   }
354 };
355
356 CkMemCheckPT::CkMemCheckPT(int w)
357 {
358   int numnodes = 0;
359 #if NODE_CHECKPOINT
360   numnodes = CmiNumPhysicalNodes();
361 #else
362   numnodes = CkNumPes();
363 #endif
364 #if CK_NO_PROC_POOL
365   if (numnodes <= 2)
366 #else
367     if (numnodes  == 1)
368 #endif
369     {
370       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
371       _memChkptOn = 0;
372     }
373   inRestarting = 0;
374   inCheckpointing = 0;
375   recvCount = peCount = 0;
376   ackCount = 0;
377   expectCount = -1;
378   where = w;
379   replicaAlive = 1;
380   notifyReplica = 0;
381 #if CMK_CONVERSE_MPI
382   void pingBuddy();
383   void pingCheckHandler();
384   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
385   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
386 #endif
387   chkpTable[0] = NULL;
388   chkpTable[1] = NULL;
389   maxIter = -1;
390   recvIterCount = 0;
391   localDecided = false;
392 }
393
394 CkMemCheckPT::~CkMemCheckPT()
395 {
396   int len = ckTable.length();
397   for (int i=0; i<len; i++) {
398     delete ckTable[i];
399   }
400 }
401
402 void CkMemCheckPT::pup(PUP::er& p) 
403
404   CBase_CkMemCheckPT::pup(p); 
405   p|cpStarter;
406   p|thisFailedPe;
407   p|failedPes;
408   p|ckCheckPTGroupID;           // recover global variable
409   p|cpCallback;                 // store callback
410   p|where;                      // where to checkpoint
411   p|peCount;
412   if (p.isUnpacking()) {
413     recvCount = 0;
414 #if CMK_CONVERSE_MPI
415     void pingBuddy();
416     void pingCheckHandler();
417     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
418     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
419 #endif
420     maxIter = -1;
421     recvIterCount = 0;
422     localDecided = false;
423   }
424 }
425
426 void CkMemCheckPT::getIter(){
427   localDecided = true;
428   localMaxIter = maxIter+1;
429   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
430   int elemCount = CkCountChkpSyncElements();
431   if(elemCount == 0){
432     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
433   }
434 }
435
436 //when one replica failes, decide the next chkp iter
437
438 void CkMemCheckPT::recvIter(int iter){
439   if(maxIter<iter){
440     maxIter=iter;
441   }
442 }
443
444 void CkMemCheckPT::recvMaxIter(int iter){
445   localDecided = false;
446   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
447 }
448
449 void CkMemCheckPT::reachChkpIter(){
450   recvIterCount++;
451   elemCount = CkCountChkpSyncElements();
452   if(recvIterCount == elemCount){
453     recvIterCount = 0;
454     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
455   }
456 }
457
458 void CkMemCheckPT::startChkp(){
459   CkPrintf("start checkpoint\n");
460   CkStartMemCheckpoint(cpCallback);
461 }
462
463 // called by checkpoint mgr to restore an array element
464 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
465 {
466 #if CMK_MEM_CHECKPOINT
467   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
468   // m->index.print();
469   PUP::fromMem p(m->packData);
470   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
471   CmiAssert(mgr);
472 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
473   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
474 #else
475   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
476 #endif
477
478   // find a list of array elements bound together
479   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
480   CmiAssert(elt);
481   CkLocRec_local *rec = elt->myRec;
482   CkVec<CkMigratable *> list;
483   mgr->migratableList(rec, list);
484   CmiAssert(list.length() > 0);
485   for (int l=0; l<list.length(); l++) {
486     elt = (ArrayElement *)list[l];
487     elt->budPEs[0] = m->bud1;
488     elt->budPEs[1] = m->bud2;
489     //    reset, may not needed now
490     // for now.
491     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
492       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
493       if (c) c->redNo = 0;
494     }
495   }
496 #endif
497 }
498
499 // return 1 if pe is a crashed processor
500 int CkMemCheckPT::isFailed(int pe)
501 {
502   for (int i=0; i<failedPes.length(); i++)
503     if (failedPes[i] == pe) return 1;
504   return 0;
505 }
506
507 // add pe into history list of all failed processors
508 void CkMemCheckPT::failed(int pe)
509 {
510   if (isFailed(pe)) return;
511   failedPes.push_back(pe);
512 }
513
514 int CkMemCheckPT::totalFailed()
515 {
516   return failedPes.length();
517 }
518
519 // create an checkpoint entry for array element of aid with index.
520 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
521 {
522   // error check, no duplicate
523   int idx, len = ckTable.size();
524   for (idx=0; idx<len; idx++) {
525     CkCheckPTInfo *entry = ckTable[idx];
526     if (index == entry->index) {
527       if (loc == entry->locMgr) {
528         // bindTo array elements
529         return;
530       }
531       // for array inheritance, the following check may fail
532       // because ArrayElement constructor of all superclasses are called
533       if (aid == entry->aid) {
534         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
535         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
536       }
537     }
538   }
539   CkCheckPTInfo *newEntry;
540   if (where == CkCheckPoint_inMEM)
541     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
542   else
543     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
544   ckTable.push_back(newEntry);
545   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
546 }
547
548 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
549 {
550 #if !CMK_CHKP_ALL       
551   int buddy = msg->bud1;
552   if (buddy == CkMyPe()) buddy = msg->bud2;
553   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
554   recvData(msg);
555   // ack
556   thisProxy[buddy].gotData();
557 #else
558   chkpTable[0] = NULL;
559   chkpTable[1] = NULL;
560   recvArrayCheckpoint(msg);
561   thisProxy[msg->bud2].gotData();
562 #endif
563 }
564
565 // loop through my checkpoint table and ask checkpointed array elements
566 // to send me checkpoint data.
567 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
568 {
569   checkpointed = 1;
570   cpCallback = cb;
571   cpStarter = starter;
572   inCheckpointing = 1;
573   if (CkMyPe() == cpStarter) {
574     startTime = CmiWallTimer();
575     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
576   }
577 #if CMK_CONVERSE_MPI
578   if(CmiNumPartition()==1)
579 #endif    
580   {
581
582 #if !CMK_CHKP_ALL
583     int len = ckTable.length();
584     for (int i=0; i<len; i++) {
585       CkCheckPTInfo *entry = ckTable[i];
586       // always let the bigger number processor send request
587       //if (CkMyPe() < entry->pNo) continue;
588       // always let the smaller number processor send request, may on same proc
589       if (!isMaster(entry->pNo)) continue;
590       // call inmem_checkpoint to the array element, ask it to send
591       // back checkpoint data via recvData().
592       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
593       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
594     }
595     // if my table is empty, then I am done
596     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
597 #else
598     startArrayCheckpoint();
599 #endif
600     sendProcData();
601   }
602 #if CMK_CONVERSE_MPI
603   else
604   {
605     startCheckpoint();
606   }
607 #endif
608   // pack and send proc level data
609 }
610
611 class MemElementPacker : public CkLocIterator{
612   private:
613     //    CkLocMgr *locMgr;
614     PUP::er &p;
615     std::map<CkHashCode,CkLocation> arrayMap;
616   public:
617     MemElementPacker(PUP::er &p_):p(p_){};
618     void addLocation(CkLocation &loc){
619       CkArrayIndexMax idx = loc.getIndex();
620       arrayMap[idx.hash()] = loc;
621     }
622     void writeCheckpoint(){
623       std::map<CkHashCode, CkLocation>::iterator it;
624       for(it = arrayMap.begin();it!=arrayMap.end();it++){
625         if(CkMyPe()==0){
626           //                            CkPrintf("[%d][%d] pack %d\n",CmiMyPartition(),CkMyPe(),it->first);
627         }
628         CkLocation loc = it->second;
629         CkLocMgr *locMgr = loc.getManager();
630         CkArrayIndexMax idx = loc.getIndex();
631         CkGroupID gID = locMgr->ckGetGroupID();
632         p|gID;
633         p|idx;
634         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
635       }
636     }
637 };
638
639 void pupAllElements(PUP::er &p){
640 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
641   int numElements;
642   if(!p.isUnpacking()){
643     numElements = CkCountArrayElements();
644   }
645   p | numElements;
646   if(!p.isUnpacking()){
647     MemElementPacker packer(p);
648     CKLOCMGR_LOOP(mgr->iterate(packer););
649     packer.writeCheckpoint();
650   }
651 #endif
652 }
653
654 void CkMemCheckPT::startArrayCheckpoint(){
655 #if CMK_CHKP_ALL
656   int size;
657   {
658     PUP::sizer psizer;
659     pupAllElements(psizer);
660     size = psizer.size();
661   }
662   int packSize = size/sizeof(double)+1;
663   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
664   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
665   msg->len = size;
666   msg->cp_flag = 1;
667   int budPEs[2];
668   msg->bud1=CkMyPe();
669   msg->bud2=ChkptOnPe(CkMyPe());
670   {
671     PUP::toMem p(msg->packData);
672     pupAllElements(p);
673   }
674   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
675   if(chkpTable[0]) delete chkpTable[0];
676   chkpTable[0] = msg;
677   //send the checkpoint to my 
678   recvCount++;
679 #endif
680 }
681
682 void CkMemCheckPT::startCheckpoint(){
683 #if CMK_CONVERSE_MPI
684   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
685     CkPrintf("in start checkpointing!!!!\n");
686   int size;
687   {
688     PUP::sizer p;
689     _handleProcData(p,CmiFalse);
690     size = p.size();
691   }
692   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
693   procMsg->len = size;
694   procMsg->cp_flag = 1;
695   {
696     PUP::toMem p(procMsg->packData);
697     _handleProcData(p,CmiFalse);
698   }
699   int pointer = CpvAccess(curPointer);
700   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
701   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
702
703   {
704     PUP::sizer psizer;
705     pupAllElements(psizer);
706     size = psizer.size();
707   }
708   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
709   msg->len = size;
710   msg->cp_flag = 1;
711   {
712     PUP::toMem p(msg->packData);
713     pupAllElements(p);
714   }
715   pointer = CpvAccess(curPointer);
716   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
717   CpvAccess(chkpBuf)[pointer] = msg;
718   if(CkMyPe()==0)
719     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
720   if(CkReplicaAlive()==1){
721     CpvAccess(recvdLocal) = 1;
722 #if CMK_USE_CHECKSUM
723     CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
724     CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
725     CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),CpvAccess(localChecksum));
726     delete tmpMsg;
727     //send checksum
728     char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
729     *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
730     CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
731     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
732 #else
733     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
734     CkPackMessage(&env);
735     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
736     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
737 #endif
738   }
739   if(CpvAccess(recvdRemote)==1){
740     //compare the checkpoint 
741     int size = CpvAccess(chkpBuf)[pointer]->len;
742 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
743 #if CMK_USE_CHECKSUM
744     if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
745       thisProxy[CkMyPe()].doneComparison(true);
746     }
747 #else
748     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
749       thisProxy[CkMyPe()].doneComparison(true);
750     }
751 #endif
752     else{
753       CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
754       thisProxy[CkMyPe()].doneComparison(false);
755     }
756     if(CkMyPe()==0)
757       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
758   }
759   else{
760     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
761       { 
762         int pointer = CpvAccess(curPointer);
763         //send the proc data
764         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
765         procMsg->pointer = pointer;
766         envelope * env = (envelope *)(UsrToEnv(procMsg));
767         CkPackMessage(&env);
768         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
769         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
770         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
771           CkPrintf("[%d] sendProcdata\n",CkMyPe());
772         }
773       }
774       //send the array checkpoint data
775       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
776       msg->pointer = CpvAccess(curPointer);
777       envelope * env = (envelope *)(UsrToEnv(msg));
778       CkPackMessage(&env);
779       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
780       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
781       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
782         CkPrintf("[%d] sendArraydata\n",CkMyPe());
783       //can continue work, no need to wait for my replica
784     }
785   }
786 #endif
787 }
788
789 void CkMemCheckPT::doneComparison(bool ret){
790   int _ret = 1;
791   if(!ret){
792     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
793     _ret = 0;
794   }else{
795     _ret = 1;
796   }
797   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
798   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
799 }
800
801 void CkMemCheckPT::doneRComparison(int ret){
802   //    if(CpvAccess(curPointer) == 0){
803   if(ret==CkNumPes()){
804     CpvAccess(localChkpDone) = 1;
805     if(CpvAccess(remoteChkpDone) ==1){
806       thisProxy.doneBothComparison();
807     }
808     if(notifyReplica == 0){
809       //notify the replica am done
810       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
811       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
812       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
813       notifyReplica = 1;
814     }
815   }
816   else{
817     CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
818     thisProxy.RollBack();
819   }
820 }
821
822 void CkMemCheckPT::doneBothComparison(){
823   CpvAccess(recvdRemote) = 0;
824   CpvAccess(recvdLocal) = 0;
825   CpvAccess(localChkpDone) = 0;
826   CpvAccess(remoteChkpDone) = 0;
827   CpvAccess(curPointer)^=1;
828   inCheckpointing = 0;
829   notifyReplica = 0;
830   if(CkMyPe() == 0){
831     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
832   }
833   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
834 }
835
836 void CkMemCheckPT::RollBack(){
837   //restore group data
838   checkpointed = 0;
839   CkMemCheckPT::inRestarting = 1;
840   int pointer = CpvAccess(curPointer)^1;//use the previous one
841   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
842   PUP::fromMem p(chkpMsg->packData);    
843
844   //destroy array elements
845   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
846   int numGroups = CkpvAccess(_groupIDTable)->size();
847   for(int i=0;i<numGroups;i++) {
848     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
849     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
850     obj->flushStates();
851     obj->ckJustMigrated();
852   }
853   //restore array elements
854
855   int numElements;
856   p|numElements;
857
858   if(p.isUnpacking()){
859     for(int i=0;i<numElements;i++){
860       //for(int i=0;i<1;i++){
861       CkGroupID gID;
862       CkArrayIndex idx;
863       p|gID;
864       p|idx;
865       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
866       CmiAssert(mgr);
867       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
868     }
869     }
870     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
871     contribute(cb);
872   }
873
874   void CkMemCheckPT::notifyReplicaDie(int pe){
875     //CkPrintf("[%d] receive replica die\n",CkMyPe());
876     replicaAlive = 0;
877     CpvAccess(_remoteCrashedNode) = pe;
878   }
879
880   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
881   {
882 #if CMK_CHKP_ALL
883     int idx = 1;
884     if(msg->bud1 == CkMyPe()){
885       idx = 0;
886     }
887     int isChkpting = msg->cp_flag;
888     if(isChkpting == 1){
889       if(chkpTable[idx]) delete chkpTable[idx];
890     }
891     chkpTable[idx] = msg;
892     if(isChkpting){
893       recvCount++;
894       if(recvCount == 2){
895         if (where == CkCheckPoint_inMEM) {
896           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
897         }
898         else if (where == CkCheckPoint_inDISK) {
899           // another barrier for finalize the writing using fsync
900           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
901           contribute(0,NULL,CkReduction::sum_int,localcb);
902         }
903         else
904           CmiAbort("Unknown checkpoint scheme");
905         recvCount = 0;
906       }
907     }
908 #endif
909   }
910
911   // don't handle array elements
912   static inline void _handleProcData(PUP::er &p, CmiBool create)
913   {
914     // save readonlys, and callback BTW
915     CkPupROData(p);
916
917     // save mainchares 
918     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
919
920 #ifndef CMK_CHARE_USE_PTR
921     // save non-migratable chare
922     CkPupChareData(p);
923 #endif
924
925     // save groups into Groups.dat
926     CkPupGroupData(p,create);
927
928     // save nodegroups into NodeGroups.dat
929     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
930   }
931
932   void CkMemCheckPT::sendProcData()
933   {
934     // find out size of buffer
935     int size;
936     {
937       PUP::sizer p;
938       _handleProcData(p,CmiTrue);
939       size = p.size();
940     }
941     int packSize = size;
942     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
943     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
944     {
945       PUP::toMem p(msg->packData);
946       _handleProcData(p,CmiTrue);
947     }
948     msg->pe = CkMyPe();
949     msg->len = size;
950     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
951     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
952   }
953
954   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
955   {
956     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
957     CpvAccess(procChkptBuf) = msg;
958     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
959     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
960   }
961
962   // ArrayElement call this function to give us the checkpointed data
963   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
964   {
965     int len = ckTable.length();
966     int idx;
967     for (idx=0; idx<len; idx++) {
968       CkCheckPTInfo *entry = ckTable[idx];
969       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
970     }
971     CkAssert(idx < len);
972     int isChkpting = msg->cp_flag;
973     ckTable[idx]->updateBuffer(msg);
974     if (isChkpting) {
975       // all my array elements have returned their inmem data
976       // inform starter processor that I am done.
977       recvCount ++;
978       if (recvCount == ckTable.length()) {
979         if (where == CkCheckPoint_inMEM) {
980           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
981         }
982         else if (where == CkCheckPoint_inDISK) {
983           // another barrier for finalize the writing using fsync
984           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
985           contribute(0,NULL,CkReduction::sum_int,localcb);
986         }
987         else
988           CmiAbort("Unknown checkpoint scheme");
989         recvCount = 0;
990       } 
991     }
992   }
993
994   // only used in disk checkpointing
995   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
996   {
997     delete m;
998 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
999     system("sync");
1000 #endif
1001     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1002   }
1003
1004   // only is called on cpStarter when checkpoint is done
1005   void CkMemCheckPT::cpFinish()
1006   {
1007     CmiAssert(CkMyPe() == cpStarter);
1008     peCount++;
1009     // now that all processors have finished, activate callback
1010     if (peCount == 2) 
1011     {
1012       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1013       peCount = 0;
1014       thisProxy.report();
1015     }
1016   }
1017
1018   // for debugging, report checkpoint info
1019   void CkMemCheckPT::report()
1020   {
1021     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1022     inCheckpointing = 0;
1023 #if !CMK_CHKP_ALL
1024     int objsize = 0;
1025     int len = ckTable.length();
1026     for (int i=0; i<len; i++) {
1027       CkCheckPTInfo *entry = ckTable[i];
1028       CmiAssert(entry);
1029       objsize += entry->getSize();
1030     }
1031     CmiAssert(CpvAccess(procChkptBuf));
1032     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1033 #else
1034     if(CkMyPe()==0)
1035       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1036 #endif
1037   }
1038
1039   /*****************************************************************************
1040     RESTART Procedure
1041    *****************************************************************************/
1042
1043   // master processor of two buddies
1044   inline int CkMemCheckPT::isMaster(int buddype)
1045   {
1046 #if 0
1047     int mype = CkMyPe();
1048     //CkPrintf("ismaster: %d %d\n", pe, mype);
1049     if (CkNumPes() - totalFailed() == 2) {
1050       return mype > buddype;
1051     }
1052     for (int i=1; i<CkNumPes(); i++) {
1053       int me = (buddype+i)%CkNumPes();
1054       if (isFailed(me)) continue;
1055       if (me == mype) return 1;
1056       else return 0;
1057     }
1058     return 0;
1059 #else
1060     // smaller one
1061     int mype = CkMyPe();
1062     //CkPrintf("ismaster: %d %d\n", pe, mype);
1063     if (CkNumPes() - totalFailed() == 2) {
1064       return mype < buddype;
1065     }
1066 #if NODE_CHECKPOINT
1067     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1068     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1069 #else
1070       for (int i=1; i<CkNumPes(); i++) {
1071 #endif
1072         int me = (mype+i)%CkNumPes();
1073         if (isFailed(me)) continue;
1074         if (me == buddype) return 1;
1075         else return 0;
1076       }
1077       return 0;
1078 #endif
1079     }
1080
1081
1082
1083 #if 0
1084     // helper class to pup all elements that belong to same ckLocMgr
1085     class ElementDestoryer : public CkLocIterator {
1086       private:
1087         CkLocMgr *locMgr;
1088       public:
1089         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1090         void addLocation(CkLocation &loc) {
1091           CkArrayIndex idx=loc.getIndex();
1092           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1093           loc.destroy();
1094         }
1095     };
1096 #endif
1097
1098     // restore the bitmap vector for LB
1099     void CkMemCheckPT::resetLB(int diepe)
1100     {
1101 #if CMK_LBDB_ON
1102       int i;
1103       char *bitmap = new char[CkNumPes()];
1104       // set processor available bitmap
1105       get_avail_vector(bitmap);
1106       for (i=0; i<failedPes.length(); i++)
1107         bitmap[failedPes[i]] = 0; 
1108       bitmap[diepe] = 0;
1109
1110 #if CK_NO_PROC_POOL
1111       set_avail_vector(bitmap);
1112 #endif
1113
1114       // if I am the crashed pe, rebuild my failedPEs array
1115       if (CkMyNode() == diepe)
1116         for (i=0; i<CkNumPes(); i++) 
1117           if (bitmap[i]==0) failed(i);
1118
1119       delete [] bitmap;
1120 #endif
1121     }
1122
1123     static double restartT;
1124
1125     // in case when failedPe dies, everybody go through its checkpoint table:
1126     // destory all array elements
1127     // recover lost buddies
1128     // reconstruct all array elements from check point data
1129     // called on all processors
1130     void CkMemCheckPT::restart(int diePe)
1131     {
1132 #if CMK_MEM_CHECKPOINT
1133       double curTime = CmiWallTimer();
1134       if (CkMyPe() == diePe){
1135         restartT = CmiWallTimer();
1136         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1137       }
1138       stage = (char*)"resetLB";
1139       startTime = curTime;
1140       if (CkMyPe() == diePe)
1141         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1142
1143 #if CK_NO_PROC_POOL
1144       failed(diePe);    // add into the list of failed pes
1145 #endif
1146       thisFailedPe = diePe;
1147
1148       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1149
1150       inRestarting = 1;
1151
1152       // disable load balancer's barrier
1153       //if (CkMyPe() != diePe) resetLB(diePe);
1154
1155       CKLOCMGR_LOOP(mgr->startInserting(););
1156
1157
1158       if(CmiNumPartition()==1){
1159         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1160       }else{
1161         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1162         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1163       }
1164 #endif
1165     }
1166
1167     // loally remove all array elements
1168     void CkMemCheckPT::removeArrayElements()
1169     {
1170 #if CMK_MEM_CHECKPOINT
1171       int len = ckTable.length();
1172       double curTime = CmiWallTimer();
1173       if (CkMyPe() == thisFailedPe) 
1174         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1175       stage = (char*)"removeArrayElements";
1176       startTime = curTime;
1177
1178       //  if (cpCallback.isInvalid()) 
1179       //          CkPrintf("invalid pe %d\n",CkMyPe());
1180       //          CkAbort("Didn't set restart callback\n");;
1181       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1182
1183       // get rid of all buffering and remote recs
1184       // including destorying all array elements
1185 #if CK_NO_PROC_POOL  
1186       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1187 #else
1188       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1189 #endif
1190       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1191 #endif
1192     }
1193
1194     // flush state in reduction manager
1195     void CkMemCheckPT::resetReductionMgr()
1196     {
1197       if (CkMyPe() == thisFailedPe) 
1198         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1199       int numGroups = CkpvAccess(_groupIDTable)->size();
1200       for(int i=0;i<numGroups;i++) {
1201         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1202         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1203         obj->flushStates();
1204         obj->ckJustMigrated();
1205       }
1206       // reset again
1207       //CpvAccess(_qd)->flushStates();
1208       if(CmiNumPartition()==1){
1209         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1210       }
1211       else
1212         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1213     }
1214
1215     // recover the lost buddies
1216     void CkMemCheckPT::recoverBuddies()
1217     {
1218       int idx;
1219       int len = ckTable.length();
1220       // ready to flush reduction manager
1221       // cannot be CkMemCheckPT::restart because destory will modify states
1222       double curTime = CmiWallTimer();
1223       if (CkMyPe() == thisFailedPe)
1224         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1225       stage = (char *)"recoverBuddies";
1226       if (CkMyPe() == thisFailedPe)
1227         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1228       startTime = curTime;
1229
1230       // recover buddies
1231       expectCount = 0;
1232 #if !CMK_CHKP_ALL
1233       for (idx=0; idx<len; idx++) {
1234         CkCheckPTInfo *entry = ckTable[idx];
1235         if (entry->pNo == thisFailedPe) {
1236 #if CK_NO_PROC_POOL
1237           // find a new buddy
1238           int budPe = BuddyPE(CkMyPe());
1239 #else
1240           int budPe = thisFailedPe;
1241 #endif
1242           CkArrayCheckPTMessage *msg = entry->getCopy();
1243           msg->bud1 = budPe;
1244           msg->bud2 = CkMyPe();
1245           msg->cp_flag = 0;            // not checkpointing
1246           thisProxy[budPe].recoverEntry(msg);
1247           expectCount ++;
1248         }
1249       }
1250 #else
1251       //send to failed pe
1252       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1253 #if CK_NO_PROC_POOL
1254         // find a new buddy
1255         int budPe = BuddyPE(CkMyPe());
1256 #else
1257         int budPe = thisFailedPe;
1258 #endif
1259         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1260         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1261         msg->cp_flag = 0;            // not checkpointing
1262         msg->bud1 = budPe;
1263         msg->bud2 = CkMyPe();
1264         thisProxy[budPe].recoverEntry(msg);
1265         expectCount ++;
1266       }
1267 #endif
1268
1269       if (expectCount == 0) {
1270         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1271       }
1272       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1273     }
1274
1275     void CkMemCheckPT::gotData()
1276     {
1277       ackCount ++;
1278       if (ackCount == expectCount) {
1279         ackCount = 0;
1280         expectCount = -1;
1281         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1282         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1283       }
1284     }
1285
1286     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1287     {
1288
1289       for (int i=0; i<n; i++) {
1290         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1291         mgr->updateLocation(idx[i], nowOnPe);
1292       }
1293       thisProxy[nowOnPe].gotReply();
1294     }
1295
1296     // restore array elements
1297     void CkMemCheckPT::recoverArrayElements()
1298     {
1299       if(CmiMyPartition()==1&&CkMyPe()==0){
1300         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1301         fflush(stdout);
1302       }
1303       double curTime = CmiWallTimer();
1304       int len = ckTable.length();
1305       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1306       stage = (char *)"recoverArrayElements";
1307       if (CkMyPe() == thisFailedPe)
1308         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1309       startTime = curTime;
1310       int flag = 0;
1311       // recover all array elements
1312       int count = 0;
1313
1314 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1315       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1316       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1317 #endif
1318
1319 #if !CMK_CHKP_ALL
1320       for (int idx=0; idx<len; idx++)
1321       {
1322         CkCheckPTInfo *entry = ckTable[idx];
1323 #if CK_NO_PROC_POOL
1324         // the bigger one will do 
1325         //    if (CkMyPe() < entry->pNo) continue;
1326         if (!isMaster(entry->pNo)) continue;
1327 #else
1328         // smaller one do it, which has the original object
1329         if (CkMyPe() == entry->pNo+1 || 
1330             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1331 #endif
1332         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1333
1334         entry->updateBuddy(CkMyPe(), entry->pNo);
1335         CkArrayCheckPTMessage *msg = entry->getCopy();
1336         // gzheng
1337         //thisProxy[CkMyPe()].inmem_restore(msg);
1338         inmem_restore(msg);
1339 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1340         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1341         int homePe = mgr->homePe(msg->index);
1342         if (homePe != CkMyPe()) {
1343           gmap[homePe].push_back(msg->locMgr);
1344           imap[homePe].push_back(msg->index);
1345         }
1346 #endif
1347         CkFreeMsg(msg);
1348         count ++;
1349       }
1350 #else
1351       char * packData;
1352       if(CmiNumPartition()==1){
1353         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1354         packData = (char *)msg->packData;
1355       }
1356       else{
1357         int pointer = CpvAccess(curPointer);
1358         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1359         packData = msg->packData;
1360       }
1361 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1362       recoverAll(packData,gmap,imap);
1363 #else
1364       recoverAll(packData);
1365 #endif
1366 #endif
1367       curTime = CmiWallTimer();
1368 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1369       for (int i=0; i<CkNumPes(); i++) {
1370         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1371           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1372           flag++;       
1373         }
1374       }
1375       delete [] imap;
1376       delete [] gmap;
1377 #endif
1378       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1379
1380       CKLOCMGR_LOOP(mgr->doneInserting(););
1381
1382       // _crashedNode = -1;
1383       CpvAccess(_crashedNode) = -1;
1384 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1385       if (CkMyPe() == 0)
1386         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1387 #else
1388       if(flag == 0)
1389       {
1390         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1391       }
1392 #endif
1393       if(CmiMyPartition()==1&&CkMyPe()==0){
1394         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1395         fflush(stdout);
1396       }
1397     }
1398
1399     void CkMemCheckPT::gotReply(){
1400       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1401     }
1402
1403     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1404       if(CmiMyPartition()==1&&CkMyPe()==0){
1405         CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1406         fflush(stdout);
1407       }
1408 #if CMK_CHKP_ALL
1409       PUP::fromMem p(packData);
1410       int numElements;
1411       p|numElements;
1412       if(p.isUnpacking()){
1413         for(int i=0;i<numElements;i++){
1414           CkGroupID gID;
1415           CkArrayIndex idx;
1416           p|gID;
1417           p|idx;
1418           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1419           int homePe = mgr->homePe(idx);
1420 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1421           mgr->resume(idx,p,CmiTrue,CmiTrue);
1422 #else
1423           if(CmiNumPartition()==1)
1424             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1425           else{
1426             if(CkMyPe()==thisFailedPe){
1427               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1428             }
1429             else{
1430               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1431             }
1432           }
1433 #endif
1434 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1435           homePe = mgr->homePe(idx);
1436           if (homePe != CkMyPe()) {
1437             gmap[homePe].push_back(gID);
1438             imap[homePe].push_back(idx);
1439           }
1440 #endif
1441         }
1442       }
1443 #endif
1444       if(CmiMyPartition()==1&&CkMyPe()==0){
1445         CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1446         fflush(stdout);
1447       }
1448     }
1449
1450
1451     // on every processor
1452     // turn load balancer back on
1453     void CkMemCheckPT::finishUp()
1454     {
1455       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1456       //CKLOCMGR_LOOP(mgr->doneInserting(););
1457
1458       if (CkMyPe() == thisFailedPe)
1459       {
1460         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1461         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1462         fflush(stdout);
1463       }
1464       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1465       inRestarting = 0;
1466
1467 #if CMK_CONVERSE_MPI    
1468       if(CmiNumPartition()!=1){
1469         CpvAccess(recvdProcChkp) = 0;
1470         CpvAccess(recvdArrayChkp) = 0;
1471         CpvAccess(curPointer)^=1;
1472         //notify my replica, restart is done
1473         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1474         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1475         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1476       }
1477       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1478         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1479       }
1480 #endif
1481
1482 #if CK_NO_PROC_POOL
1483 #if NODE_CHECKPOINT
1484       int numnodes = CmiNumPhysicalNodes();
1485 #else
1486       int numnodes = CkNumPes();
1487 #endif
1488       if (numnodes-totalFailed() <=2) {
1489         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1490         _memChkptOn = 0;
1491       }
1492 #endif
1493     }
1494
1495     void CkMemCheckPT::recoverFromSoftFailure()
1496     {
1497       inRestarting = 0;
1498       CpvAccess(recvdRemote) = 0;
1499       CpvAccess(recvdLocal) = 0;
1500       CpvAccess(localChkpDone) = 0;
1501       CpvAccess(remoteChkpDone) = 0;
1502       inCheckpointing = 0;
1503       notifyReplica = 0;
1504       if(CkMyPe() == 0){
1505         CmiPrintf("[%d][%d] Recover From soft failures, sending callback ... \n", CmiMyPartition(),CkMyPe());
1506       }
1507       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1508     }
1509     // called only on 0
1510     void CkMemCheckPT::quiescence(CkCallback &cb)
1511     {
1512       static int pe_count = 0;
1513       pe_count ++;
1514       CmiAssert(CkMyPe() == 0);
1515       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1516       if (pe_count == CkNumPes()) {
1517         pe_count = 0;
1518         cb.send();
1519       }
1520     }
1521
1522     // User callable function - to start a checkpoint
1523     // callback cb is used to pass control back
1524     void CkStartMemCheckpoint(CkCallback &cb)
1525     {
1526 #if CMK_MEM_CHECKPOINT
1527       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1528       /*if (_memChkptOn == 0) {
1529         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1530         cb.send();
1531         return;
1532         }*/
1533       if (CkInRestarting()) {
1534         // trying to checkpointing during restart
1535         cb.send();
1536         return;
1537       }
1538       // store user callback and user data
1539       CkMemCheckPT::cpCallback = cb;
1540
1541       // broadcast to start check pointing
1542       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1543       checkptMgr.doItNow(CkMyPe(), cb);
1544 #else
1545       // when mem checkpoint is disabled, invike cb immediately
1546       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1547       cb.send();
1548 #endif
1549     }
1550
1551     void CkRestartCheckPoint(int diePe)
1552     {
1553       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1554       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1555       // broadcast
1556       checkptMgr.restart(diePe);
1557     }
1558
1559     static int _diePE = -1;
1560
1561     // callback function used locally by ccs handler
1562     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1563     {
1564       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1565       CkRestartCheckPoint(_diePE);
1566     }
1567
1568
1569     // called on crashed PE
1570     static void restartBeginHandler(char *msg)
1571     {
1572       CmiFree(msg);
1573 #if CMK_MEM_CHECKPOINT
1574 #if CMK_USE_BARRIER
1575       if(CkMyPe()!=_diePE){
1576         printf("restar begin on %d\n",CkMyPe());
1577         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1578         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1579         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1580       }else{
1581         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1582         CkRestartCheckPointCallback(NULL, NULL);
1583       }
1584 #else
1585       static int count = 0;
1586       CmiAssert(CkMyPe() == _diePE);
1587       count ++;
1588       if (count == CkNumPes()) {
1589         printf("restart begin on %d\n",CkMyPe());
1590         CkRestartCheckPointCallback(NULL, NULL);
1591         count = 0;
1592       }
1593 #endif
1594 #endif
1595     }
1596
1597     extern void _discard_charm_message();
1598     extern void _resume_charm_message();
1599
1600     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1601       return data;
1602     }
1603
1604     static void restartBcastHandler(char *msg)
1605     {
1606 #if CMK_MEM_CHECKPOINT
1607       // advance phase counter
1608       CkMemCheckPT::inRestarting = 1;
1609       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1610       // gzheng
1611       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1612
1613       if (CkMyPe()==_diePE)
1614         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1615
1616       // reset QD counters
1617       /*  gzheng
1618           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1619        */
1620
1621       /*  gzheng
1622           if (CkMyPe()==_diePE)
1623           CkRestartCheckPointCallback(NULL, NULL);
1624        */
1625       CmiFree(msg);
1626
1627       _resume_charm_message();
1628
1629       // reduction
1630       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1631       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1632 #if CMK_USE_BARRIER
1633       //CmiPrintf("before reduce\n");   
1634       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1635       //CmiPrintf("after reduce\n");    
1636 #else
1637       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1638 #endif 
1639       checkpointed = 0;
1640 #endif
1641     }
1642
1643     extern void _initDone();
1644
1645     bool compare(char * buf1, char *buf2){
1646       PUP::checker pchecker(buf1,buf2);
1647       pchecker.skip();
1648       int numElements;
1649       pchecker|numElements;
1650       for(int i=0;i<numElements;i++){
1651         CkGroupID gID;
1652         CkArrayIndex idx;
1653
1654         pchecker|gID;
1655         pchecker|idx;
1656
1657         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1658         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1659       }
1660       return pchecker.getResult();
1661       //return true;
1662     }
1663     int getChecksum(char * buf){
1664       PUP::checker pchecker(buf);
1665       pchecker.skip();
1666       int numElements;
1667       pchecker|numElements;
1668 //      CkPrintf("num %d\n",numElements);
1669       for(int i=0;i<numElements;i++){
1670         CkGroupID gID;
1671         CkArrayIndex idx;
1672
1673         pchecker|gID;
1674         pchecker|idx;
1675
1676         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1677         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1678       }
1679       return pchecker.getChecksum();
1680     }
1681
1682     static void recvRemoteChkpHandler(char *msg){
1683 #if CMK_USE_CHECKSUM
1684       CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1685       CpvAccess(recvdRemote) = 1;
1686       if(CpvAccess(recvdLocal)==1){
1687         if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1688           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1689         }
1690         else{
1691           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1692         }
1693       }
1694 #else
1695       envelope *env = (envelope *)msg;
1696       CkUnpackMessage(&env);
1697       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1698       if(CpvAccess(recvdLocal)==1){
1699         int pointer = CpvAccess(curPointer);
1700         int size = CpvAccess(chkpBuf)[pointer]->len;
1701         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1702           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1703         }else
1704         {
1705          // CkPrintf("[%d][%d] failed the test my size %d buddy size %d pointer %d\n",CmiMyPartition(),CkMyPe(),size,chkpMsg->len,pointer);
1706           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1707         }
1708         delete chkpMsg;
1709         if(CkMyPe()==0)
1710           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1711       }else{
1712         CpvAccess(recvdRemote) = 1;
1713         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1714         CpvAccess(buddyBuf) = chkpMsg;
1715       }
1716 #endif
1717     }
1718
1719     static void replicaRecoverHandler(char *msg){
1720       CpvAccess(_remoteCrashedNode) = -1;
1721       CkMemCheckPT::replicaAlive = 1;
1722       //fflush(stdout);
1723       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1724       bool ret = true;
1725       CpvAccess(remoteChkpDone) = 1;
1726       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1727       CmiFree(msg);
1728
1729     }
1730     static void replicaChkpDoneHandler(char *msg){
1731       CpvAccess(remoteChkpDone) = 1;
1732       if(CpvAccess(localChkpDone) == 1)
1733         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1734       CmiFree(msg);
1735     }
1736
1737     static void replicaDieHandler(char * msg){
1738 #if CMK_CONVERSE_MPI    
1739       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1740       CpvAccess(_remoteCrashedNode) = diePe;
1741       CkMemCheckPT::replicaAlive = 0;
1742       if(CkMyPe()==diePe){
1743         CmiPrintf("pe %d in replicad word die\n",diePe);
1744         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1745         fflush(stdout);
1746       }
1747       find_spare_mpirank(diePe,CmiMyPartition()^1);
1748 #endif
1749       //broadcast to my partition to get local max iter
1750       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1751       CmiFree(msg);
1752     }
1753
1754
1755     static void replicaDieBcastHandler(char *msg){
1756       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1757       CpvAccess(_remoteCrashedNode) = diePe;
1758       CkMemCheckPT::replicaAlive = 0;
1759       CmiFree(msg);
1760     }
1761
1762     static void recoverRemoteProcDataHandler(char *msg){
1763       if(CmiMyPartition()==1&&CkMyPe()==0){
1764         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1765         fflush(stdout);
1766       }
1767       envelope *env = (envelope *)msg;
1768       CkUnpackMessage(&env);
1769       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1770
1771       //store the checkpoint
1772       int pointer = procMsg->pointer;
1773
1774
1775       if(CkMyPe()==CpvAccess(_crashedNode)){
1776         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1777         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1778         PUP::fromMem p(procMsg->packData);
1779         _handleProcData(p,CmiTrue);
1780         _initDone();
1781       }
1782       else{
1783         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1784         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1785         //_handleProcData(p,CmiFalse);
1786       }
1787       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1788       CKLOCMGR_LOOP(mgr->startInserting(););
1789
1790       CpvAccess(recvdProcChkp) =1;
1791       if(CpvAccess(recvdArrayChkp)==1){
1792         _resume_charm_message();
1793         _diePE = CpvAccess(_crashedNode);
1794         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1795         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1796         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1797       }
1798       if(CmiMyPartition()==1&&CkMyPe()==0){
1799         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1800         fflush(stdout);
1801       }
1802     }
1803
1804     static void recoverRemoteArrayDataHandler(char *msg){
1805       envelope *env = (envelope *)msg;
1806       CkUnpackMessage(&env);
1807       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1808       if(CmiMyPartition()==1&&CkMyPe()==0){
1809         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1810         fflush(stdout);
1811       }
1812
1813       //store the checkpoint
1814       int pointer = chkpMsg->pointer;
1815       CpvAccess(curPointer) = pointer;
1816       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1817       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1818       CpvAccess(recvdArrayChkp) =1;
1819       CkMemCheckPT::inRestarting = 1;
1820       if(CpvAccess(recvdProcChkp) == 1){
1821         _resume_charm_message();
1822         _diePE = CpvAccess(_crashedNode);
1823         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1824         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1825         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1826         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1827       }
1828       if(CmiMyPartition()==1&&CkMyPe()==0){
1829         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1830         fflush(stdout);
1831       }
1832     }
1833
1834     static void recvPhaseHandler(char * msg)
1835     {
1836       CpvAccess(_curRestartPhase)--;
1837       CkMemCheckPT::inRestarting = 1;
1838       CmiFree(msg);
1839     }
1840     // called on crashed processor
1841     static void recoverProcDataHandler(char *msg)
1842     {
1843 #if CMK_MEM_CHECKPOINT
1844       int i;
1845       envelope *env = (envelope *)msg;
1846       CkUnpackMessage(&env);
1847       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1848       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1849       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1850       PUP::fromMem p(procMsg->packData);
1851       _handleProcData(p,CmiTrue);
1852
1853       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1854       // gzheng
1855       CKLOCMGR_LOOP(mgr->startInserting(););
1856
1857       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1858       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1859       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1860       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1861
1862       _initDone();
1863       //   CpvAccess(_qd)->flushStates();
1864       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1865 #endif
1866     }
1867     //for replica, got the phase number from my neighbor processor in the current partition
1868     static void askPhaseHandler(char *msg)
1869     {
1870 #if CMK_MEM_CHECKPOINT
1871       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1872       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1873       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1874       CmiSetHandler(msg, recvPhaseHandlerIdx);
1875       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1876 #endif
1877     }
1878     // called on its backup processor
1879     // get backup message buffer and sent to crashed processor
1880     static void askProcDataHandler(char *msg)
1881     {
1882 #if CMK_MEM_CHECKPOINT
1883       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1884       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1885       if (CpvAccess(procChkptBuf) == NULL)  {
1886         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1887         CkAbort("no checkpoint found");
1888       }
1889       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1890       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1891
1892       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1893
1894       CkPackMessage(&env);
1895       CmiSetHandler(env, recoverProcDataHandlerIdx);
1896       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1897       CpvAccess(procChkptBuf) = NULL;
1898       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1899 #endif
1900     }
1901
1902     // called on PE 0
1903     void qd_callback(void *m)
1904     {
1905       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1906       fflush(stdout);
1907       CkFreeMsg(m);
1908       if(CmiNumPartition()==1){
1909 #ifdef CMK_SMP
1910         for(int i=0;i<CmiMyNodeSize();i++){
1911           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1912           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1913           CmiSetHandler(msg, askProcDataHandlerIdx);
1914           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1915           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1916         }
1917         return;
1918 #endif
1919         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1920         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1921         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1922         CmiSetHandler(msg, askProcDataHandlerIdx);
1923         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1924         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1925       }
1926       else{
1927         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1928         CmiSetHandler(msg, recvPhaseHandlerIdx);
1929         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1930       }
1931     }
1932
1933     // on crashed node
1934     void CkMemRestart(const char *dummy, CkArgMsg *args)
1935     {
1936 #if CMK_MEM_CHECKPOINT
1937       _diePE = CmiMyNode();
1938       CpvAccess( _crashedNode )= CmiMyNode();
1939       CkMemCheckPT::inRestarting = 1;
1940       _discard_charm_message();
1941
1942       /*if(CmiMyRank()==0){
1943         CkCallback cb(qd_callback);
1944         CkStartQD(cb);
1945         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1946         }*/
1947       if(CmiNumPartition()==1){
1948         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1949         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1950         restartT = CmiWallTimer();
1951         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1952         fflush(stdout);
1953         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1954         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1955         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1956         CmiSetHandler(msg, askProcDataHandlerIdx);
1957         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1958         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1959       }
1960       else{
1961         CkCallback cb(qd_callback);
1962         CkStartQD(cb);
1963       }
1964 #else
1965       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1966 #endif
1967     }
1968
1969     // can be called in other files
1970     // return true if it is in restarting
1971     extern "C"
1972       int CkInRestarting()
1973       {
1974 #if CMK_MEM_CHECKPOINT
1975         if (CpvAccess( _crashedNode)!=-1) return 1;
1976         // gzheng
1977         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1978         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1979         return CkMemCheckPT::inRestarting;
1980 #else
1981         return 0;
1982 #endif
1983       }
1984
1985     extern "C"
1986       int CkReplicaAlive()
1987       {
1988         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1989         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1990         return CkMemCheckPT::replicaAlive;
1991
1992         /*if(CkMemCheckPT::replicaDead==1)
1993           return 0;
1994           else          
1995           return 1;*/
1996       }
1997
1998     extern "C"
1999       int CkInCheckpointing()
2000       {
2001         return CkMemCheckPT::inCheckpointing;
2002       }
2003
2004     extern "C"
2005       void CkSetInLdb(){
2006 #if CMK_MEM_CHECKPOINT
2007         CkMemCheckPT::inLoadbalancing = 1;
2008 #endif
2009       }
2010
2011     extern "C"
2012       int CkInLdb(){
2013 #if CMK_MEM_CHECKPOINT
2014         return CkMemCheckPT::inLoadbalancing;
2015 #endif
2016         return 0;
2017       }
2018
2019     extern "C"
2020       void CkResetInLdb(){
2021 #if CMK_MEM_CHECKPOINT
2022         CkMemCheckPT::inLoadbalancing = 0;
2023 #endif
2024       }
2025
2026     /*****************************************************************************
2027       module initialization
2028      *****************************************************************************/
2029
2030     static int arg_where = CkCheckPoint_inMEM;
2031
2032 #if CMK_MEM_CHECKPOINT
2033     void init_memcheckpt(char **argv)
2034     {
2035       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2036         arg_where = CkCheckPoint_inDISK;
2037       }
2038       // initiliazing _crashedNode variable
2039       CpvInitialize(int, _crashedNode);
2040       CpvInitialize(int, _remoteCrashedNode);
2041       CpvAccess(_crashedNode) = -1;
2042       CpvAccess(_remoteCrashedNode) = -1;
2043       init_FI(argv);
2044     }
2045 #endif
2046
2047     class CkMemCheckPTInit: public Chare {
2048       public:
2049         CkMemCheckPTInit(CkArgMsg *m) {
2050 #if CMK_MEM_CHECKPOINT
2051           if (arg_where == CkCheckPoint_inDISK) {
2052             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2053           }
2054           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2055           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2056 #endif
2057         }
2058     };
2059
2060     static void notifyHandler(char *msg)
2061     {
2062 #if CMK_MEM_CHECKPOINT
2063       CmiFree(msg);
2064       /* immediately increase restart phase to filter old messages */
2065       CpvAccess(_curRestartPhase) ++;
2066       CpvAccess(_qd)->flushStates();
2067       _discard_charm_message();
2068
2069 #endif
2070     }
2071
2072     extern "C"
2073       void notify_crash(int node)
2074       {
2075 #ifdef CMK_MEM_CHECKPOINT
2076         CpvAccess( _crashedNode) = node;
2077 #ifdef CMK_SMP
2078         for(int i=0;i<CkMyNodeSize();i++){
2079           CpvAccessOther(_crashedNode,i)=node;
2080         }
2081 #endif
2082         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2083         CkMemCheckPT::inRestarting = 1;
2084
2085         // this may be in interrupt handler, send a message to reset QD
2086         int pe = CmiNodeFirst(CkMyNode());
2087         for(int i=0;i<CkMyNodeSize();i++){
2088           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2089           CmiSetHandler(msg, notifyHandlerIdx);
2090           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2091         }
2092 #endif
2093       }
2094
2095     extern "C" void (*notify_crash_fn)(int node);
2096
2097
2098 #if CMK_CONVERSE_MPI
2099     void buddyDieHandler(char *msg)
2100     {
2101 #if CMK_MEM_CHECKPOINT
2102       // notify
2103       CkMemCheckPT::inRestarting = 1;
2104       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2105       notify_crash(diepe);
2106       // send message to crash pe to let it restart
2107       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2108       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2109       int buddy = obj->BuddyPE(CmiMyPe());
2110       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2111       if (buddy == diepe)  {
2112         mpi_restart_crashed(diepe, newrank);
2113       }
2114 #endif
2115     }
2116
2117     void pingHandler(void *msg)
2118     {
2119       lastPingTime = CmiWallTimer();
2120       CmiFree(msg);
2121     }
2122
2123     void pingCheckHandler()
2124     {
2125 #if CMK_MEM_CHECKPOINT
2126       double now = CmiWallTimer();
2127       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2128         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2129         int i, pe, buddy;
2130         // tell everyone the buddy dies
2131         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2132         for (i = 1; i < CmiNumPes(); i++) {
2133           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2134           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2135         }
2136         buddy = pe;
2137         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2138         fflush(stdout);
2139         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2140         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2141         CmiSetHandler(msg, buddyDieHandlerIdx);
2142         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2143         //send to everyone in the other world
2144         if(CmiNumPartition()!=1){
2145           for(int i=0;i<CmiNumPes();i++){
2146             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2147             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2148             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2149             //fflush(stdout);
2150             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2151             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2152           }
2153         }
2154       }
2155         else 
2156           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2157 #endif
2158       }
2159
2160       void pingBuddy()
2161       {
2162 #if CMK_MEM_CHECKPOINT
2163         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2164         if (obj) {
2165           int buddy = obj->BuddyPE(CkMyPe());
2166           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2167           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2168           CmiSetHandler(msg, pingHandlerIdx);
2169           CmiGetRestartPhase(msg) = 9999;
2170           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2171         }
2172         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2173 #endif
2174       }
2175 #endif
2176
2177       // initproc
2178       void CkRegisterRestartHandler( )
2179       {
2180 #if CMK_MEM_CHECKPOINT
2181         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2182         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2183         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2184         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2185         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2186
2187         //for replica
2188         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2189         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2190         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2191         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2192         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2193         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2194         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2195         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2196         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2197
2198 #if CMK_CONVERSE_MPI
2199         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2200         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2201         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2202 #endif
2203
2204         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2205         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2206         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2207         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2208         CpvInitialize(int,curPointer);
2209         CpvInitialize(int,recvdLocal);
2210         CpvInitialize(int,localChkpDone);
2211         CpvInitialize(int,remoteChkpDone);
2212         CpvInitialize(int,recvdRemote);
2213         CpvInitialize(int,recvdProcChkp);
2214         CpvInitialize(int,localChecksum);
2215         CpvInitialize(int,remoteChecksum);
2216         CpvInitialize(int,recvdArrayChkp);
2217
2218         CpvAccess(procChkptBuf) = NULL;
2219         CpvAccess(buddyBuf) = NULL;
2220         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2221         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2222         CpvAccess(chkpBuf)[0] = NULL;
2223         CpvAccess(chkpBuf)[1] = NULL;
2224         CpvAccess(localProcChkpBuf)[0] = NULL;
2225         CpvAccess(localProcChkpBuf)[1] = NULL;
2226
2227         CpvAccess(curPointer) = 0;
2228         CpvAccess(recvdLocal) = 0;
2229         CpvAccess(localChkpDone) = 0;
2230         CpvAccess(remoteChkpDone) = 0;
2231         CpvAccess(recvdRemote) = 0;
2232         CpvAccess(recvdProcChkp) = 0;
2233         CpvAccess(localChecksum) = 0;
2234         CpvAccess(remoteChecksum) = 0;
2235         CpvAccess(recvdArrayChkp) = 0;
2236
2237         notify_crash_fn = notify_crash;
2238
2239 #if ! CMK_CONVERSE_MPI
2240         // print pid to kill
2241         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2242         //  sleep(4);
2243 #endif
2244 #endif
2245       }
2246
2247
2248       extern "C"
2249         int CkHasCheckpoints()
2250         {
2251           return checkpointed;
2252         }
2253
2254       /// @todo: the following definitions should be moved to a separate file containing
2255       // structures and functions about fault tolerance strategies
2256
2257       /**
2258        *  * @brief: function for killing a process                                             
2259        *   */
2260 #ifdef CMK_MEM_CHECKPOINT
2261 #if CMK_HAS_GETPID
2262       void killLocal(void *_dummy,double curWallTime){
2263         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2264         if(CmiWallTimer()<killTime-1){
2265           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2266         }else{ 
2267 #if CMK_CONVERSE_MPI
2268           CkDieNow();
2269 #else 
2270           kill(getpid(),SIGKILL);                                               
2271 #endif
2272         }              
2273       } 
2274 #else
2275       void killLocal(void *_dummy,double curWallTime){
2276         CmiAbort("kill() not supported!");
2277       }
2278 #endif
2279 #endif
2280
2281 #ifdef CMK_MEM_CHECKPOINT
2282       /**
2283        * @brief: reads the file with the kill information
2284        */
2285       void readKillFile(){
2286         FILE *fp=fopen(killFile,"r");
2287         if(!fp){
2288           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2289           return;
2290         }
2291         int proc;
2292         double sec;
2293         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2294           if(proc == CkMyNode() && CkMyRank() == 0){
2295             killTime = CmiWallTimer()+sec;
2296             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2297             CcdCallFnAfter(killLocal,NULL,sec*1000);
2298           }
2299         }
2300         fclose(fp);
2301       }
2302
2303 #if ! CMK_CONVERSE_MPI
2304       void CkDieNow()
2305       {
2306 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2307         // ignored for non-mpi version
2308         CmiPrintf("[%d] die now.\n", CmiMyPe());
2309         killTime = CmiWallTimer()+0.001;
2310         CcdCallFnAfter(killLocal,NULL,1);
2311 #endif
2312       }
2313 #endif
2314
2315 #endif
2316
2317 #include "CkMemCheckpoint.def.h"
2318
2319