checksum for ampi
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, _remoteCrashedNode);
77
78 // static, so that it is accessible from Converse part
79 int CkMemCheckPT::inRestarting = 0;
80 int CkMemCheckPT::inCheckpointing = 0;
81 int CkMemCheckPT::replicaAlive = 1;
82 int CkMemCheckPT::inLoadbalancing = 0;
83 double CkMemCheckPT::startTime;
84 char *CkMemCheckPT::stage;
85 CkCallback CkMemCheckPT::cpCallback;
86
87 int _memChkptOn = 1;                    // checkpoint is on or off
88
89 CkGroupID ckCheckPTGroupID;             // readonly
90
91 static int checkpointed = 0;
92
93 /// @todo the following declarations should be moved into a separate file for all 
94 // fault tolerant strategies
95
96 #ifdef CMK_MEM_CHECKPOINT
97 // name of the kill file that contains processes to be killed 
98 char *killFile;                                               
99 // flag for the kill file         
100 int killFlag=0;
101 // variable for storing the killing time
102 double killTime=0.0;
103 #endif
104
105 #ifdef CKLOCMGR_LOOP
106 #undef CKLOCMGR_LOOP
107 #endif
108 // loop over all CkLocMgr and do "code"
109 #define  CKLOCMGR_LOOP(code)    {       \
110   int numGroups = CkpvAccess(_groupIDTable)->size();    \
111   for(int i=0;i<numGroups;i++) {        \
112     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
113     if(obj->isLocMgr())  {      \
114       CkLocMgr *mgr = (CkLocMgr*)obj;   \
115       code      \
116     }   \
117   }     \
118 }
119
120 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
121 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
122 CpvDeclare(CkCheckPTMessage**, chkpBuf);
123 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
124 //store the checkpoint of the buddy to compare
125 //do not need the whole msg, can be the checksum
126 CpvDeclare(CkCheckPTMessage*, buddyBuf);
127 //pointer of the checkpoint going to be written
128 CpvDeclare(int, curPointer);
129 CpvDeclare(int, recvdRemote);
130 CpvDeclare(int, recvdLocal);
131 CpvDeclare(int, localChkpDone);
132 CpvDeclare(int, remoteChkpDone);
133 CpvDeclare(int, recvdArrayChkp);
134 CpvDeclare(int, recvdProcChkp);
135 CpvDeclare(int, localChecksum);
136 CpvDeclare(int, remoteChecksum);
137
138 bool compare(char * buf1, char * buf2);
139 int getChecksum(char * buf);
140 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
141 // Converse function handles
142 static int askPhaseHandlerIdx;
143 static int recvPhaseHandlerIdx;
144 static int askProcDataHandlerIdx;
145 static int restartBcastHandlerIdx;
146 static int recoverProcDataHandlerIdx;
147 static int restartBeginHandlerIdx;
148 static int recvRemoteChkpHandlerIdx;
149 static int replicaDieHandlerIdx;
150 static int replicaDieBcastHandlerIdx;
151 static int replicaRecoverHandlerIdx;
152 static int replicaChkpDoneHandlerIdx;
153 static int recoverRemoteProcDataHandlerIdx;
154 static int recoverRemoteArrayDataHandlerIdx;
155 static int notifyHandlerIdx;
156 // compute the backup processor
157 // FIXME: avoid crashed processors
158 #if CMK_CONVERSE_MPI
159 static int pingHandlerIdx;
160 static int pingCheckHandlerIdx;
161 static int buddyDieHandlerIdx;
162 static double lastPingTime = -1;
163
164 extern "C" void mpi_restart_crashed(int pe, int rank);
165 extern "C" int  find_spare_mpirank(int pe,int partition);
166
167 void pingBuddy();
168 void pingCheckHandler();
169
170 #endif
171 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
172
173 inline int CkMemCheckPT::BuddyPE(int pe)
174 {
175   int budpe;
176 #if NODE_CHECKPOINT
177   // buddy is the processor with same rank on the next physical node
178   int r1 = CmiPhysicalRank(pe);
179   int budnode = CmiPhysicalNodeID(pe);
180   do {
181     budnode = (budnode+1)%CmiNumPhysicalNodes();
182     int *pelist;
183     int num;
184     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
185     budpe = pelist[r1 % num];
186   } while (isFailed(budpe));
187   if (budpe == pe) {
188     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
189     CmiAbort("Failed to find a buddy processor");
190   }
191 #else
192   budpe = pe;
193   budpe = (budpe+1)%CkNumPes();
194 #endif
195   return budpe;
196 }
197
198 // called in array element constructor
199 // choose and register with 2 buddies for checkpoiting 
200 #if CMK_MEM_CHECKPOINT
201 void ArrayElement::init_checkpt() {
202   if (_memChkptOn == 0) return;
203   if (CkInRestarting()) {
204     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
205   }
206   // only master init checkpoint
207   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
208
209   budPEs[0] = CkMyPe();
210   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
211   CmiAssert(budPEs[0] != budPEs[1]);
212   // inform checkPTMgr
213   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
214   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
215   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
216   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
217 }
218 #endif
219
220 // entry function invoked by checkpoint mgr asking for checkpoint data
221 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
222 #if CMK_MEM_CHECKPOINT
223   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
224   //char index[128];   thisIndexMax.sprint(index);
225   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
226   CkLocMgr *locMgr = thisArray->getLocMgr();
227   CmiAssert(myRec!=NULL);
228   int size;
229   {
230     PUP::sizer p;
231     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
232     size = p.size();
233   }
234   int packSize = size/sizeof(double) +1;
235   CkArrayCheckPTMessage *msg =
236     new (packSize, 0) CkArrayCheckPTMessage;
237   msg->len = size;
238   msg->index =thisIndexMax;
239   msg->aid = thisArrayID;
240   msg->locMgr = locMgr->getGroupID();
241   msg->cp_flag = 1;
242   {
243     PUP::toMem p(msg->packData);
244     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
245   }
246
247   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
248   checkptMgr.recvData(msg, 2, budPEs);
249   delete m;
250 #endif
251 }
252
253 // checkpoint holder class - for memory checkpointing
254 class CkMemCheckPTInfo: public CkCheckPTInfo
255 {
256   CkArrayCheckPTMessage *ckBuffer;
257   public:
258   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
259     CkCheckPTInfo(a, loc, idx, pno)
260   {
261     ckBuffer = NULL;
262   }
263   ~CkMemCheckPTInfo() 
264   {
265     if (ckBuffer) delete ckBuffer; 
266   }
267   inline void updateBuffer(CkArrayCheckPTMessage *data) 
268   {
269     CmiAssert(data!=NULL);
270     if (ckBuffer) delete ckBuffer;
271     ckBuffer = data;
272   }    
273   inline CkArrayCheckPTMessage * getCopy()
274   {
275     if (ckBuffer == NULL) {
276       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
277       CmiAbort("Abort!");
278     }
279     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
280   }     
281   inline void updateBuddy(int b1, int b2) {
282     CmiAssert(ckBuffer);
283     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
284     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
285     CmiAssert(pNo != CkMyPe());
286   }
287   inline int getSize() { 
288     CmiAssert(ckBuffer);
289     return ckBuffer->len; 
290   }
291 };
292
293 // checkpoint holder class - for in-disk checkpointing
294 class CkDiskCheckPTInfo: public CkCheckPTInfo 
295 {
296   char *fname;
297   int bud1, bud2;
298   int len;                      // checkpoint size
299   public:
300   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
301   {
302 #if CMK_USE_MKSTEMP
303     fname = new char[64];
304     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
305     mkstemp(fname);
306 #else
307     fname=tmpnam(NULL);
308 #endif
309     bud1 = bud2 = -1;
310     len = 0;
311   }
312   ~CkDiskCheckPTInfo() 
313   {
314     remove(fname);
315   }
316   inline void updateBuffer(CkArrayCheckPTMessage *data) 
317   {
318     double t = CmiWallTimer();
319     // unpack it
320     envelope *env = UsrToEnv(data);
321     CkUnpackMessage(&env);
322     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
323     FILE *f = fopen(fname,"wb");
324     PUP::toDisk p(f);
325     CkPupMessage(p, (void **)&data);
326     // delay sync to the end because otherwise the messages are blocked
327     //    fsync(fileno(f));
328     fclose(f);
329     bud1 = data->bud1;
330     bud2 = data->bud2;
331     len = data->len;
332     delete data;
333     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
334   }
335   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
336   {
337     CkArrayCheckPTMessage *data;
338     FILE *f = fopen(fname,"rb");
339     PUP::fromDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     fclose(f);
342     data->bud1 = bud1;                          // update the buddies
343     data->bud2 = bud2;
344     return data;
345   }
346   inline void updateBuddy(int b1, int b2) {
347     bud1 = b1; bud2 = b2;
348     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
349     CmiAssert(pNo != CkMyPe());
350   }
351   inline int getSize() { 
352     return len; 
353   }
354 };
355
356 CkMemCheckPT::CkMemCheckPT(int w)
357 {
358   int numnodes = 0;
359 #if NODE_CHECKPOINT
360   numnodes = CmiNumPhysicalNodes();
361 #else
362   numnodes = CkNumPes();
363 #endif
364 #if CK_NO_PROC_POOL
365   if (numnodes <= 2)
366 #else
367     if (numnodes  == 1)
368 #endif
369     {
370       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
371       _memChkptOn = 0;
372     }
373   inRestarting = 0;
374   inCheckpointing = 0;
375   recvCount = peCount = 0;
376   ackCount = 0;
377   expectCount = -1;
378   where = w;
379   replicaAlive = 1;
380   notifyReplica = 0;
381 #if CMK_CONVERSE_MPI
382   void pingBuddy();
383   void pingCheckHandler();
384   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
385   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
386 #endif
387   chkpTable[0] = NULL;
388   chkpTable[1] = NULL;
389   maxIter = -1;
390   recvIterCount = 0;
391   localDecided = false;
392 }
393
394 CkMemCheckPT::~CkMemCheckPT()
395 {
396   int len = ckTable.length();
397   for (int i=0; i<len; i++) {
398     delete ckTable[i];
399   }
400 }
401
402 void CkMemCheckPT::pup(PUP::er& p) 
403
404   CBase_CkMemCheckPT::pup(p); 
405   p|cpStarter;
406   p|thisFailedPe;
407   p|failedPes;
408   p|ckCheckPTGroupID;           // recover global variable
409   p|cpCallback;                 // store callback
410   p|where;                      // where to checkpoint
411   p|peCount;
412   if (p.isUnpacking()) {
413     recvCount = 0;
414 #if CMK_CONVERSE_MPI
415     void pingBuddy();
416     void pingCheckHandler();
417     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
418     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
419 #endif
420     maxIter = -1;
421     recvIterCount = 0;
422     localDecided = false;
423   }
424 }
425
426 void CkMemCheckPT::getIter(){
427   localDecided = true;
428   localMaxIter = maxIter+1;
429   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
430   int elemCount = CkCountChkpSyncElements();
431   if(elemCount == 0){
432     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
433   }
434 }
435
436 //when one replica failes, decide the next chkp iter
437
438 void CkMemCheckPT::recvIter(int iter){
439   if(maxIter<iter){
440     maxIter=iter;
441   }
442 }
443
444 void CkMemCheckPT::recvMaxIter(int iter){
445   localDecided = false;
446   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
447 }
448
449 void CkMemCheckPT::reachChkpIter(){
450   recvIterCount++;
451   elemCount = CkCountChkpSyncElements();
452   if(recvIterCount == elemCount){
453     recvIterCount = 0;
454     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
455   }
456 }
457
458 void CkMemCheckPT::startChkp(){
459   CkPrintf("start checkpoint\n");
460   CkStartMemCheckpoint(cpCallback);
461 }
462
463 // called by checkpoint mgr to restore an array element
464 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
465 {
466 #if CMK_MEM_CHECKPOINT
467   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
468   // m->index.print();
469   PUP::fromMem p(m->packData);
470   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
471   CmiAssert(mgr);
472 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
473   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
474 #else
475   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
476 #endif
477
478   // find a list of array elements bound together
479   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
480   CmiAssert(elt);
481   CkLocRec_local *rec = elt->myRec;
482   CkVec<CkMigratable *> list;
483   mgr->migratableList(rec, list);
484   CmiAssert(list.length() > 0);
485   for (int l=0; l<list.length(); l++) {
486     elt = (ArrayElement *)list[l];
487     elt->budPEs[0] = m->bud1;
488     elt->budPEs[1] = m->bud2;
489     //    reset, may not needed now
490     // for now.
491     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
492       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
493       if (c) c->redNo = 0;
494     }
495   }
496 #endif
497 }
498
499 // return 1 if pe is a crashed processor
500 int CkMemCheckPT::isFailed(int pe)
501 {
502   for (int i=0; i<failedPes.length(); i++)
503     if (failedPes[i] == pe) return 1;
504   return 0;
505 }
506
507 // add pe into history list of all failed processors
508 void CkMemCheckPT::failed(int pe)
509 {
510   if (isFailed(pe)) return;
511   failedPes.push_back(pe);
512 }
513
514 int CkMemCheckPT::totalFailed()
515 {
516   return failedPes.length();
517 }
518
519 // create an checkpoint entry for array element of aid with index.
520 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
521 {
522   // error check, no duplicate
523   int idx, len = ckTable.size();
524   for (idx=0; idx<len; idx++) {
525     CkCheckPTInfo *entry = ckTable[idx];
526     if (index == entry->index) {
527       if (loc == entry->locMgr) {
528         // bindTo array elements
529         return;
530       }
531       // for array inheritance, the following check may fail
532       // because ArrayElement constructor of all superclasses are called
533       if (aid == entry->aid) {
534         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
535         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
536       }
537     }
538   }
539   CkCheckPTInfo *newEntry;
540   if (where == CkCheckPoint_inMEM)
541     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
542   else
543     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
544   ckTable.push_back(newEntry);
545   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
546 }
547
548 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
549 {
550 #if !CMK_CHKP_ALL       
551   int buddy = msg->bud1;
552   if (buddy == CkMyPe()) buddy = msg->bud2;
553   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
554   recvData(msg);
555   // ack
556   thisProxy[buddy].gotData();
557 #else
558   chkpTable[0] = NULL;
559   chkpTable[1] = NULL;
560   recvArrayCheckpoint(msg);
561   thisProxy[msg->bud2].gotData();
562 #endif
563 }
564
565 // loop through my checkpoint table and ask checkpointed array elements
566 // to send me checkpoint data.
567 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
568 {
569   checkpointed = 1;
570   cpCallback = cb;
571   cpStarter = starter;
572   inCheckpointing = 1;
573   if (CkMyPe() == cpStarter) {
574     startTime = CmiWallTimer();
575     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
576   }
577 #if CMK_CONVERSE_MPI
578   if(CmiNumPartition()==1)
579 #endif    
580   {
581
582 #if !CMK_CHKP_ALL
583     int len = ckTable.length();
584     for (int i=0; i<len; i++) {
585       CkCheckPTInfo *entry = ckTable[i];
586       // always let the bigger number processor send request
587       //if (CkMyPe() < entry->pNo) continue;
588       // always let the smaller number processor send request, may on same proc
589       if (!isMaster(entry->pNo)) continue;
590       // call inmem_checkpoint to the array element, ask it to send
591       // back checkpoint data via recvData().
592       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
593       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
594     }
595     // if my table is empty, then I am done
596     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
597 #else
598     startArrayCheckpoint();
599 #endif
600     sendProcData();
601   }
602 #if CMK_CONVERSE_MPI
603   else
604   {
605     startCheckpoint();
606   }
607 #endif
608   // pack and send proc level data
609 }
610
611 class MemElementPacker : public CkLocIterator{
612   private:
613     //    CkLocMgr *locMgr;
614     PUP::er &p;
615     std::map<CkHashCode,CkLocation> arrayMap;
616   public:
617     MemElementPacker(PUP::er &p_):p(p_){};
618     void addLocation(CkLocation &loc){
619       CkArrayIndexMax idx = loc.getIndex();
620       arrayMap[idx.hash()] = loc;
621     }
622     void writeCheckpoint(){
623       std::map<CkHashCode, CkLocation>::iterator it;
624       for(it = arrayMap.begin();it!=arrayMap.end();it++){
625         CkLocation loc = it->second;
626         CkLocMgr *locMgr = loc.getManager();
627         CkArrayIndexMax idx = loc.getIndex();
628         CkGroupID gID = locMgr->ckGetGroupID();
629         p|gID;
630         p|idx;
631         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
632       }
633     }
634 };
635
636 void pupAllElements(PUP::er &p){
637 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
638   int numElements;
639   if(!p.isUnpacking()){
640     numElements = CkCountArrayElements();
641   }
642   p | numElements;
643   if(!p.isUnpacking()){
644     MemElementPacker packer(p);
645     CKLOCMGR_LOOP(mgr->iterate(packer););
646     packer.writeCheckpoint();
647   }
648 #endif
649 }
650
651 void CkMemCheckPT::startArrayCheckpoint(){
652 #if CMK_CHKP_ALL
653   int size;
654   {
655     PUP::sizer psizer;
656     pupAllElements(psizer);
657     size = psizer.size();
658   }
659   int packSize = size/sizeof(double)+1;
660   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
661   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
662   msg->len = size;
663   msg->cp_flag = 1;
664   int budPEs[2];
665   msg->bud1=CkMyPe();
666   msg->bud2=ChkptOnPe(CkMyPe());
667   {
668     PUP::toMem p(msg->packData);
669     pupAllElements(p);
670   }
671   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
672   if(chkpTable[0]) delete chkpTable[0];
673   chkpTable[0] = msg;
674   //send the checkpoint to my 
675   recvCount++;
676 #endif
677 }
678
679 void CkMemCheckPT::startCheckpoint(){
680 #if CMK_CONVERSE_MPI
681   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
682     CkPrintf("in start checkpointing!!!!\n");
683   int size;
684   {
685     PUP::sizer p;
686     _handleProcData(p,CmiFalse);
687     size = p.size();
688   }
689   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
690   procMsg->len = size;
691   procMsg->cp_flag = 1;
692   {
693     PUP::toMem p(procMsg->packData);
694     _handleProcData(p,CmiFalse);
695   }
696   int pointer = CpvAccess(curPointer);
697   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
698   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
699
700   {
701     PUP::sizer psizer;
702     pupAllElements(psizer);
703     size = psizer.size();
704   }
705   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
706   msg->len = size;
707   msg->cp_flag = 1;
708   int checksum;
709   {
710 #if CMK_USE_CHECKSUM
711     PUP::checker p(msg->packData);
712     pupAllElements(p);
713     checksum = p.getChecksum();
714     CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
715 #else 
716     PUP::toMem p(msg->packData);
717     pupAllElements(p);
718 #endif
719   }
720   pointer = CpvAccess(curPointer);
721   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
722   CpvAccess(chkpBuf)[pointer] = msg;
723   if(CkMyPe()==0)
724     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
725   if(CkReplicaAlive()==1){
726     CpvAccess(recvdLocal) = 1;
727 #if CMK_USE_CHECKSUM
728 //    CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
729 //    CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
730 //    delete tmpMsg;
731     CpvAccess(localChecksum) = checksum;
732     char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
733     *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
734     CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
735     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
736 #else
737     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
738     CkPackMessage(&env);
739     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
740     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
741 #endif
742   }
743   if(CpvAccess(recvdRemote)==1){
744     //compare the checkpoint 
745     int size = CpvAccess(chkpBuf)[pointer]->len;
746 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
747 #if CMK_USE_CHECKSUM
748     if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
749       thisProxy[CkMyPe()].doneComparison(true);
750     }
751 #else
752     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
753       thisProxy[CkMyPe()].doneComparison(true);
754     }
755 #endif
756     else{
757       //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
758       thisProxy[CkMyPe()].doneComparison(false);
759     }
760     if(CkMyPe()==0)
761       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
762   }
763   else{
764     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
765       { 
766         int pointer = CpvAccess(curPointer);
767         //send the proc data
768         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
769         procMsg->pointer = pointer;
770         envelope * env = (envelope *)(UsrToEnv(procMsg));
771         CkPackMessage(&env);
772         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
773         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
774         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
775           CkPrintf("[%d] sendProcdata\n",CkMyPe());
776         }
777       }
778       //send the array checkpoint data
779       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
780       msg->pointer = CpvAccess(curPointer);
781       envelope * env = (envelope *)(UsrToEnv(msg));
782       CkPackMessage(&env);
783       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
784       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
785       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
786         CkPrintf("[%d] sendArraydata\n",CkMyPe());
787       //can continue work, no need to wait for my replica
788     }
789   }
790 #endif
791 }
792
793 void CkMemCheckPT::doneComparison(bool ret){
794   int _ret = 1;
795   if(!ret){
796     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
797     _ret = 0;
798   }else{
799     _ret = 1;
800   }
801   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
802   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
803 }
804
805 void CkMemCheckPT::doneRComparison(int ret){
806   //    if(CpvAccess(curPointer) == 0){
807   if(ret==CkNumPes()){
808     CpvAccess(localChkpDone) = 1;
809     if(CpvAccess(remoteChkpDone) ==1){
810       thisProxy.doneBothComparison();
811     }
812     if(notifyReplica == 0){
813       //notify the replica am done
814       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
815       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
816       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
817       notifyReplica = 1;
818     }
819   }
820   else{
821     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
822     startTime = CmiWallTimer();
823     thisProxy.RollBack();
824   }
825 }
826
827 void CkMemCheckPT::doneBothComparison(){
828   CpvAccess(recvdRemote) = 0;
829   CpvAccess(recvdLocal) = 0;
830   CpvAccess(localChkpDone) = 0;
831   CpvAccess(remoteChkpDone) = 0;
832   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
833   CpvAccess(curPointer)^=1;
834   inCheckpointing = 0;
835   notifyReplica = 0;
836   if(CkMyPe() == 0){
837     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
838   }
839   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
840 }
841
842 void CkMemCheckPT::RollBack(){
843   //restore group data
844   checkpointed = 0;
845   CkMemCheckPT::inRestarting = 1;
846   int pointer = CpvAccess(curPointer)^1;//use the previous one
847   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
848   PUP::fromMem p(chkpMsg->packData);    
849
850   //destroy array elements
851   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
852   int numGroups = CkpvAccess(_groupIDTable)->size();
853   for(int i=0;i<numGroups;i++) {
854     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
855     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
856     obj->flushStates();
857     obj->ckJustMigrated();
858   }
859   //restore array elements
860
861   int numElements;
862   p|numElements;
863
864   if(p.isUnpacking()){
865     for(int i=0;i<numElements;i++){
866       CkGroupID gID;
867       CkArrayIndex idx;
868       p|gID;
869       p|idx;
870       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
871       CmiAssert(mgr);
872       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
873     }
874     }
875     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
876     contribute(cb);
877   }
878
879   void CkMemCheckPT::notifyReplicaDie(int pe){
880     replicaAlive = 0;
881     CpvAccess(_remoteCrashedNode) = pe;
882   }
883
884   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
885   {
886 #if CMK_CHKP_ALL
887     int idx = 1;
888     if(msg->bud1 == CkMyPe()){
889       idx = 0;
890     }
891     int isChkpting = msg->cp_flag;
892     if(isChkpting == 1){
893       if(chkpTable[idx]) delete chkpTable[idx];
894     }
895     chkpTable[idx] = msg;
896     if(isChkpting){
897       recvCount++;
898       if(recvCount == 2){
899         if (where == CkCheckPoint_inMEM) {
900           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
901         }
902         else if (where == CkCheckPoint_inDISK) {
903           // another barrier for finalize the writing using fsync
904           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
905           contribute(0,NULL,CkReduction::sum_int,localcb);
906         }
907         else
908           CmiAbort("Unknown checkpoint scheme");
909         recvCount = 0;
910       }
911     }
912 #endif
913   }
914
915   // don't handle array elements
916   static inline void _handleProcData(PUP::er &p, CmiBool create)
917   {
918     // save readonlys, and callback BTW
919     CkPupROData(p);
920
921     // save mainchares 
922     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
923
924 #ifndef CMK_CHARE_USE_PTR
925     // save non-migratable chare
926     CkPupChareData(p);
927 #endif
928
929     // save groups into Groups.dat
930     CkPupGroupData(p,create);
931
932     // save nodegroups into NodeGroups.dat
933     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
934   }
935
936   void CkMemCheckPT::sendProcData()
937   {
938     // find out size of buffer
939     int size;
940     {
941       PUP::sizer p;
942       _handleProcData(p,CmiTrue);
943       size = p.size();
944     }
945     int packSize = size;
946     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
947     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
948     {
949       PUP::toMem p(msg->packData);
950       _handleProcData(p,CmiTrue);
951     }
952     msg->pe = CkMyPe();
953     msg->len = size;
954     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
955     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
956   }
957
958   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
959   {
960     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
961     CpvAccess(procChkptBuf) = msg;
962     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
963     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
964   }
965
966   // ArrayElement call this function to give us the checkpointed data
967   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
968   {
969     int len = ckTable.length();
970     int idx;
971     for (idx=0; idx<len; idx++) {
972       CkCheckPTInfo *entry = ckTable[idx];
973       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
974     }
975     CkAssert(idx < len);
976     int isChkpting = msg->cp_flag;
977     ckTable[idx]->updateBuffer(msg);
978     if (isChkpting) {
979       // all my array elements have returned their inmem data
980       // inform starter processor that I am done.
981       recvCount ++;
982       if (recvCount == ckTable.length()) {
983         if (where == CkCheckPoint_inMEM) {
984           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
985         }
986         else if (where == CkCheckPoint_inDISK) {
987           // another barrier for finalize the writing using fsync
988           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
989           contribute(0,NULL,CkReduction::sum_int,localcb);
990         }
991         else
992           CmiAbort("Unknown checkpoint scheme");
993         recvCount = 0;
994       } 
995     }
996   }
997
998   // only used in disk checkpointing
999   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1000   {
1001     delete m;
1002 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1003     system("sync");
1004 #endif
1005     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1006   }
1007
1008   // only is called on cpStarter when checkpoint is done
1009   void CkMemCheckPT::cpFinish()
1010   {
1011     CmiAssert(CkMyPe() == cpStarter);
1012     peCount++;
1013     // now that all processors have finished, activate callback
1014     if (peCount == 2) 
1015     {
1016       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1017       peCount = 0;
1018       thisProxy.report();
1019     }
1020   }
1021
1022   // for debugging, report checkpoint info
1023   void CkMemCheckPT::report()
1024   {
1025     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1026     inCheckpointing = 0;
1027 #if !CMK_CHKP_ALL
1028     int objsize = 0;
1029     int len = ckTable.length();
1030     for (int i=0; i<len; i++) {
1031       CkCheckPTInfo *entry = ckTable[i];
1032       CmiAssert(entry);
1033       objsize += entry->getSize();
1034     }
1035     CmiAssert(CpvAccess(procChkptBuf));
1036     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1037 #else
1038     if(CkMyPe()==0)
1039       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1040 #endif
1041   }
1042
1043   /*****************************************************************************
1044     RESTART Procedure
1045    *****************************************************************************/
1046
1047   // master processor of two buddies
1048   inline int CkMemCheckPT::isMaster(int buddype)
1049   {
1050 #if 0
1051     int mype = CkMyPe();
1052     //CkPrintf("ismaster: %d %d\n", pe, mype);
1053     if (CkNumPes() - totalFailed() == 2) {
1054       return mype > buddype;
1055     }
1056     for (int i=1; i<CkNumPes(); i++) {
1057       int me = (buddype+i)%CkNumPes();
1058       if (isFailed(me)) continue;
1059       if (me == mype) return 1;
1060       else return 0;
1061     }
1062     return 0;
1063 #else
1064     // smaller one
1065     int mype = CkMyPe();
1066     //CkPrintf("ismaster: %d %d\n", pe, mype);
1067     if (CkNumPes() - totalFailed() == 2) {
1068       return mype < buddype;
1069     }
1070 #if NODE_CHECKPOINT
1071     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1072     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1073 #else
1074       for (int i=1; i<CkNumPes(); i++) {
1075 #endif
1076         int me = (mype+i)%CkNumPes();
1077         if (isFailed(me)) continue;
1078         if (me == buddype) return 1;
1079         else return 0;
1080       }
1081       return 0;
1082 #endif
1083     }
1084
1085
1086
1087 #if 0
1088     // helper class to pup all elements that belong to same ckLocMgr
1089     class ElementDestoryer : public CkLocIterator {
1090       private:
1091         CkLocMgr *locMgr;
1092       public:
1093         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1094         void addLocation(CkLocation &loc) {
1095           CkArrayIndex idx=loc.getIndex();
1096           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1097           loc.destroy();
1098         }
1099     };
1100 #endif
1101
1102     // restore the bitmap vector for LB
1103     void CkMemCheckPT::resetLB(int diepe)
1104     {
1105 #if CMK_LBDB_ON
1106       int i;
1107       char *bitmap = new char[CkNumPes()];
1108       // set processor available bitmap
1109       get_avail_vector(bitmap);
1110       for (i=0; i<failedPes.length(); i++)
1111         bitmap[failedPes[i]] = 0; 
1112       bitmap[diepe] = 0;
1113
1114 #if CK_NO_PROC_POOL
1115       set_avail_vector(bitmap);
1116 #endif
1117
1118       // if I am the crashed pe, rebuild my failedPEs array
1119       if (CkMyNode() == diepe)
1120         for (i=0; i<CkNumPes(); i++) 
1121           if (bitmap[i]==0) failed(i);
1122
1123       delete [] bitmap;
1124 #endif
1125     }
1126
1127     static double restartT;
1128
1129     // in case when failedPe dies, everybody go through its checkpoint table:
1130     // destory all array elements
1131     // recover lost buddies
1132     // reconstruct all array elements from check point data
1133     // called on all processors
1134     void CkMemCheckPT::restart(int diePe)
1135     {
1136 #if CMK_MEM_CHECKPOINT
1137       double curTime = CmiWallTimer();
1138       if (CkMyPe() == diePe){
1139         restartT = CmiWallTimer();
1140         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1141       }
1142       stage = (char*)"resetLB";
1143       startTime = curTime;
1144       if (CkMyPe() == diePe)
1145         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1146
1147 #if CK_NO_PROC_POOL
1148       failed(diePe);    // add into the list of failed pes
1149 #endif
1150       thisFailedPe = diePe;
1151
1152       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1153
1154       inRestarting = 1;
1155
1156       // disable load balancer's barrier
1157       //if (CkMyPe() != diePe) resetLB(diePe);
1158
1159       CKLOCMGR_LOOP(mgr->startInserting(););
1160
1161
1162       if(CmiNumPartition()==1){
1163         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1164       }else{
1165         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1166         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1167       }
1168 #endif
1169     }
1170
1171     // loally remove all array elements
1172     void CkMemCheckPT::removeArrayElements()
1173     {
1174 #if CMK_MEM_CHECKPOINT
1175       int len = ckTable.length();
1176       double curTime = CmiWallTimer();
1177       if (CkMyPe() == thisFailedPe) 
1178         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1179       stage = (char*)"removeArrayElements";
1180       startTime = curTime;
1181
1182       //  if (cpCallback.isInvalid()) 
1183       //          CkPrintf("invalid pe %d\n",CkMyPe());
1184       //          CkAbort("Didn't set restart callback\n");;
1185       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1186
1187       // get rid of all buffering and remote recs
1188       // including destorying all array elements
1189 #if CK_NO_PROC_POOL  
1190       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1191 #else
1192       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1193 #endif
1194       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1195 #endif
1196     }
1197
1198     // flush state in reduction manager
1199     void CkMemCheckPT::resetReductionMgr()
1200     {
1201       if (CkMyPe() == thisFailedPe) 
1202         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1203       int numGroups = CkpvAccess(_groupIDTable)->size();
1204       for(int i=0;i<numGroups;i++) {
1205         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1206         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1207         obj->flushStates();
1208         obj->ckJustMigrated();
1209       }
1210       // reset again
1211       //CpvAccess(_qd)->flushStates();
1212       if(CmiNumPartition()==1){
1213         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1214       }
1215       else
1216         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1217     }
1218
1219     // recover the lost buddies
1220     void CkMemCheckPT::recoverBuddies()
1221     {
1222       int idx;
1223       int len = ckTable.length();
1224       // ready to flush reduction manager
1225       // cannot be CkMemCheckPT::restart because destory will modify states
1226       double curTime = CmiWallTimer();
1227       if (CkMyPe() == thisFailedPe)
1228         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1229       stage = (char *)"recoverBuddies";
1230       if (CkMyPe() == thisFailedPe)
1231         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1232       startTime = curTime;
1233
1234       // recover buddies
1235       expectCount = 0;
1236 #if !CMK_CHKP_ALL
1237       for (idx=0; idx<len; idx++) {
1238         CkCheckPTInfo *entry = ckTable[idx];
1239         if (entry->pNo == thisFailedPe) {
1240 #if CK_NO_PROC_POOL
1241           // find a new buddy
1242           int budPe = BuddyPE(CkMyPe());
1243 #else
1244           int budPe = thisFailedPe;
1245 #endif
1246           CkArrayCheckPTMessage *msg = entry->getCopy();
1247           msg->bud1 = budPe;
1248           msg->bud2 = CkMyPe();
1249           msg->cp_flag = 0;            // not checkpointing
1250           thisProxy[budPe].recoverEntry(msg);
1251           expectCount ++;
1252         }
1253       }
1254 #else
1255       //send to failed pe
1256       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1257 #if CK_NO_PROC_POOL
1258         // find a new buddy
1259         int budPe = BuddyPE(CkMyPe());
1260 #else
1261         int budPe = thisFailedPe;
1262 #endif
1263         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1264         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1265         msg->cp_flag = 0;            // not checkpointing
1266         msg->bud1 = budPe;
1267         msg->bud2 = CkMyPe();
1268         thisProxy[budPe].recoverEntry(msg);
1269         expectCount ++;
1270       }
1271 #endif
1272
1273       if (expectCount == 0) {
1274         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1275       }
1276       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1277     }
1278
1279     void CkMemCheckPT::gotData()
1280     {
1281       ackCount ++;
1282       if (ackCount == expectCount) {
1283         ackCount = 0;
1284         expectCount = -1;
1285         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1286         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1287       }
1288     }
1289
1290     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1291     {
1292
1293       for (int i=0; i<n; i++) {
1294         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1295         mgr->updateLocation(idx[i], nowOnPe);
1296       }
1297       thisProxy[nowOnPe].gotReply();
1298     }
1299
1300     // restore array elements
1301     void CkMemCheckPT::recoverArrayElements()
1302     {
1303       double curTime = CmiWallTimer();
1304       int len = ckTable.length();
1305       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1306       stage = (char *)"recoverArrayElements";
1307       if (CkMyPe() == thisFailedPe)
1308         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1309       startTime = curTime;
1310       int flag = 0;
1311       // recover all array elements
1312       int count = 0;
1313
1314 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1315       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1316       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1317 #endif
1318
1319 #if !CMK_CHKP_ALL
1320       for (int idx=0; idx<len; idx++)
1321       {
1322         CkCheckPTInfo *entry = ckTable[idx];
1323 #if CK_NO_PROC_POOL
1324         // the bigger one will do 
1325         //    if (CkMyPe() < entry->pNo) continue;
1326         if (!isMaster(entry->pNo)) continue;
1327 #else
1328         // smaller one do it, which has the original object
1329         if (CkMyPe() == entry->pNo+1 || 
1330             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1331 #endif
1332         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1333
1334         entry->updateBuddy(CkMyPe(), entry->pNo);
1335         CkArrayCheckPTMessage *msg = entry->getCopy();
1336         // gzheng
1337         //thisProxy[CkMyPe()].inmem_restore(msg);
1338         inmem_restore(msg);
1339 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1340         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1341         int homePe = mgr->homePe(msg->index);
1342         if (homePe != CkMyPe()) {
1343           gmap[homePe].push_back(msg->locMgr);
1344           imap[homePe].push_back(msg->index);
1345         }
1346 #endif
1347         CkFreeMsg(msg);
1348         count ++;
1349       }
1350 #else
1351       char * packData;
1352       if(CmiNumPartition()==1){
1353         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1354         packData = (char *)msg->packData;
1355       }
1356       else{
1357         int pointer = CpvAccess(curPointer);
1358         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1359         packData = msg->packData;
1360       }
1361 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1362       recoverAll(packData,gmap,imap);
1363 #else
1364       recoverAll(packData);
1365 #endif
1366 #endif
1367 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1368       for (int i=0; i<CkNumPes(); i++) {
1369         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1370           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1371           flag++;       
1372         }
1373       }
1374       delete [] imap;
1375       delete [] gmap;
1376 #endif
1377       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1378
1379       CKLOCMGR_LOOP(mgr->doneInserting(););
1380
1381       // _crashedNode = -1;
1382       CpvAccess(_crashedNode) = -1;
1383 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1384       if (CkMyPe() == 0)
1385         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1386 #else
1387       if(flag == 0)
1388       {
1389         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1390       }
1391 #endif
1392     }
1393
1394     void CkMemCheckPT::gotReply(){
1395       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1396     }
1397
1398     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1399 #if CMK_CHKP_ALL
1400       PUP::fromMem p(packData);
1401       int numElements;
1402       p|numElements;
1403       if(p.isUnpacking()){
1404         for(int i=0;i<numElements;i++){
1405           CkGroupID gID;
1406           CkArrayIndex idx;
1407           p|gID;
1408           p|idx;
1409           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1410           int homePe = mgr->homePe(idx);
1411 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1412           mgr->resume(idx,p,CmiTrue,CmiTrue);
1413 #else
1414           if(CmiNumPartition()==1)
1415             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1416           else{
1417             if(CkMyPe()==thisFailedPe){
1418               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1419             }
1420             else{
1421               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1422             }
1423           }
1424 #endif
1425 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1426           homePe = mgr->homePe(idx);
1427           if (homePe != CkMyPe()) {
1428             gmap[homePe].push_back(gID);
1429             imap[homePe].push_back(idx);
1430           }
1431 #endif
1432         }
1433       }
1434 #endif
1435     }
1436
1437
1438     // on every processor
1439     // turn load balancer back on
1440     void CkMemCheckPT::finishUp()
1441     {
1442       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1443       //CKLOCMGR_LOOP(mgr->doneInserting(););
1444
1445       if (CkMyPe() == thisFailedPe)
1446       {
1447         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1448         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1449         fflush(stdout);
1450       }
1451       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1452       inRestarting = 0;
1453
1454 #if CMK_CONVERSE_MPI    
1455       if(CmiNumPartition()!=1){
1456         CpvAccess(recvdProcChkp) = 0;
1457         CpvAccess(recvdArrayChkp) = 0;
1458         CpvAccess(curPointer)^=1;
1459         //notify my replica, restart is done
1460         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1461         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1462         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1463       }
1464       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1465         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1466       }
1467 #endif
1468
1469 #if CK_NO_PROC_POOL
1470 #if NODE_CHECKPOINT
1471       int numnodes = CmiNumPhysicalNodes();
1472 #else
1473       int numnodes = CkNumPes();
1474 #endif
1475       if (numnodes-totalFailed() <=2) {
1476         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1477         _memChkptOn = 0;
1478       }
1479 #endif
1480     }
1481
1482     void CkMemCheckPT::recoverFromSoftFailure()
1483     {
1484       inRestarting = 0;
1485       CpvAccess(recvdRemote) = 0;
1486       CpvAccess(recvdLocal) = 0;
1487       CpvAccess(localChkpDone) = 0;
1488       CpvAccess(remoteChkpDone) = 0;
1489       inCheckpointing = 0;
1490       notifyReplica = 0;
1491       if(CkMyPe() == 0){
1492         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1493       }
1494       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1495     }
1496     // called only on 0
1497     void CkMemCheckPT::quiescence(CkCallback &cb)
1498     {
1499       static int pe_count = 0;
1500       pe_count ++;
1501       CmiAssert(CkMyPe() == 0);
1502       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1503       if (pe_count == CkNumPes()) {
1504         pe_count = 0;
1505         cb.send();
1506       }
1507     }
1508
1509     // User callable function - to start a checkpoint
1510     // callback cb is used to pass control back
1511     void CkStartMemCheckpoint(CkCallback &cb)
1512     {
1513 #if CMK_MEM_CHECKPOINT
1514       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1515       /*if (_memChkptOn == 0) {
1516         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1517         cb.send();
1518         return;
1519         }*/
1520       if (CkInRestarting()) {
1521         // trying to checkpointing during restart
1522         cb.send();
1523         return;
1524       }
1525       // store user callback and user data
1526       CkMemCheckPT::cpCallback = cb;
1527
1528       // broadcast to start check pointing
1529       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1530       checkptMgr.doItNow(CkMyPe(), cb);
1531 #else
1532       // when mem checkpoint is disabled, invike cb immediately
1533       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1534       cb.send();
1535 #endif
1536     }
1537
1538     void CkRestartCheckPoint(int diePe)
1539     {
1540       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1541       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1542       // broadcast
1543       checkptMgr.restart(diePe);
1544     }
1545
1546     static int _diePE = -1;
1547
1548     // callback function used locally by ccs handler
1549     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1550     {
1551       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1552       CkRestartCheckPoint(_diePE);
1553     }
1554
1555
1556     // called on crashed PE
1557     static void restartBeginHandler(char *msg)
1558     {
1559       CmiFree(msg);
1560 #if CMK_MEM_CHECKPOINT
1561 #if CMK_USE_BARRIER
1562       if(CkMyPe()!=_diePE){
1563         printf("restar begin on %d\n",CkMyPe());
1564         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1565         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1566         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1567       }else{
1568         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1569         CkRestartCheckPointCallback(NULL, NULL);
1570       }
1571 #else
1572       static int count = 0;
1573       CmiAssert(CkMyPe() == _diePE);
1574       count ++;
1575       if (count == CkNumPes()) {
1576         printf("restart begin on %d\n",CkMyPe());
1577         CkRestartCheckPointCallback(NULL, NULL);
1578         count = 0;
1579       }
1580 #endif
1581 #endif
1582     }
1583
1584     extern void _discard_charm_message();
1585     extern void _resume_charm_message();
1586
1587     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1588       return data;
1589     }
1590
1591     static void restartBcastHandler(char *msg)
1592     {
1593 #if CMK_MEM_CHECKPOINT
1594       // advance phase counter
1595       CkMemCheckPT::inRestarting = 1;
1596       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1597       // gzheng
1598       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1599
1600       if (CkMyPe()==_diePE)
1601         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1602
1603       // reset QD counters
1604       /*  gzheng
1605           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1606        */
1607
1608       /*  gzheng
1609           if (CkMyPe()==_diePE)
1610           CkRestartCheckPointCallback(NULL, NULL);
1611        */
1612       CmiFree(msg);
1613
1614       _resume_charm_message();
1615
1616       // reduction
1617       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1618       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1619 #if CMK_USE_BARRIER
1620       //CmiPrintf("before reduce\n");   
1621       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1622       //CmiPrintf("after reduce\n");    
1623 #else
1624       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1625 #endif 
1626       checkpointed = 0;
1627 #endif
1628     }
1629
1630     extern void _initDone();
1631
1632     bool compare(char * buf1, char *buf2){
1633       if(CkMyPe()==0)
1634         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1635       PUP::checker pchecker(buf1,buf2);
1636       pchecker.skip();
1637       int numElements;
1638       pchecker|numElements;
1639       for(int i=0;i<numElements;i++){
1640         CkGroupID gID;
1641         CkArrayIndex idx;
1642
1643         pchecker|gID;
1644         pchecker|idx;
1645
1646         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1647         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1648       }
1649       if(CkMyPe()==0)
1650         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1651       return pchecker.getResult();
1652     }
1653     int getChecksum(char * buf){
1654       PUP::checker pchecker(buf);
1655       pchecker.skip();
1656       int numElements;
1657       pchecker|numElements;
1658 //      CkPrintf("num %d\n",numElements);
1659       for(int i=0;i<numElements;i++){
1660         CkGroupID gID;
1661         CkArrayIndex idx;
1662
1663         pchecker|gID;
1664         pchecker|idx;
1665
1666         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1667         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1668       }
1669       return pchecker.getChecksum();
1670     }
1671
1672     static void recvRemoteChkpHandler(char *msg){
1673 #if CMK_USE_CHECKSUM
1674       if(CkMyPe()==0)
1675         CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1676       CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1677       CpvAccess(recvdRemote) = 1;
1678       if(CpvAccess(recvdLocal)==1){
1679         if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1680           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1681         }
1682         else{
1683           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1684         }
1685       }
1686 #else
1687       envelope *env = (envelope *)msg;
1688       CkUnpackMessage(&env);
1689       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1690       if(CpvAccess(recvdLocal)==1){
1691         int pointer = CpvAccess(curPointer);
1692         int size = CpvAccess(chkpBuf)[pointer]->len;
1693         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1694           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1695         }else
1696         {
1697           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1698         }
1699         delete chkpMsg;
1700         if(CkMyPe()==0)
1701           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1702       }else{
1703         CpvAccess(recvdRemote) = 1;
1704         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1705         CpvAccess(buddyBuf) = chkpMsg;
1706       }
1707 #endif
1708     }
1709
1710     static void replicaRecoverHandler(char *msg){
1711       CpvAccess(_remoteCrashedNode) = -1;
1712       CkMemCheckPT::replicaAlive = 1;
1713       //fflush(stdout);
1714       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1715       bool ret = true;
1716       CpvAccess(remoteChkpDone) = 1;
1717       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1718       CmiFree(msg);
1719
1720     }
1721     static void replicaChkpDoneHandler(char *msg){
1722       CpvAccess(remoteChkpDone) = 1;
1723       if(CpvAccess(localChkpDone) == 1)
1724         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1725       CmiFree(msg);
1726     }
1727
1728     static void replicaDieHandler(char * msg){
1729 #if CMK_CONVERSE_MPI    
1730       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1731       CpvAccess(_remoteCrashedNode) = diePe;
1732       CkMemCheckPT::replicaAlive = 0;
1733       if(CkMyPe()==diePe){
1734         CmiPrintf("pe %d in replicad word die\n",diePe);
1735         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1736         fflush(stdout);
1737       }
1738       find_spare_mpirank(diePe,CmiMyPartition()^1);
1739 #endif
1740       //broadcast to my partition to get local max iter
1741       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1742       CmiFree(msg);
1743     }
1744
1745
1746     static void replicaDieBcastHandler(char *msg){
1747       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1748       CpvAccess(_remoteCrashedNode) = diePe;
1749       CkMemCheckPT::replicaAlive = 0;
1750       CmiFree(msg);
1751     }
1752
1753     static void recoverRemoteProcDataHandler(char *msg){
1754       envelope *env = (envelope *)msg;
1755       CkUnpackMessage(&env);
1756       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1757
1758       //store the checkpoint
1759       int pointer = procMsg->pointer;
1760
1761
1762       if(CkMyPe()==CpvAccess(_crashedNode)){
1763         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1764         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1765         PUP::fromMem p(procMsg->packData);
1766         _handleProcData(p,CmiTrue);
1767         _initDone();
1768       }
1769       else{
1770         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1771         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1772         //_handleProcData(p,CmiFalse);
1773       }
1774       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1775       CKLOCMGR_LOOP(mgr->startInserting(););
1776
1777       CpvAccess(recvdProcChkp) =1;
1778       if(CpvAccess(recvdArrayChkp)==1){
1779         _resume_charm_message();
1780         _diePE = CpvAccess(_crashedNode);
1781         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1782         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1783         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1784       }
1785     }
1786
1787     static void recoverRemoteArrayDataHandler(char *msg){
1788       envelope *env = (envelope *)msg;
1789       CkUnpackMessage(&env);
1790       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1791
1792       //store the checkpoint
1793       int pointer = chkpMsg->pointer;
1794       CpvAccess(curPointer) = pointer;
1795       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1796       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1797       CpvAccess(recvdArrayChkp) =1;
1798       CkMemCheckPT::inRestarting = 1;
1799       if(CpvAccess(recvdProcChkp) == 1){
1800         _resume_charm_message();
1801         _diePE = CpvAccess(_crashedNode);
1802         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1803         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1804         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1805         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1806       }
1807     }
1808
1809     static void recvPhaseHandler(char * msg)
1810     {
1811       CpvAccess(_curRestartPhase)--;
1812       CkMemCheckPT::inRestarting = 1;
1813       CmiFree(msg);
1814     }
1815     // called on crashed processor
1816     static void recoverProcDataHandler(char *msg)
1817     {
1818 #if CMK_MEM_CHECKPOINT
1819       int i;
1820       envelope *env = (envelope *)msg;
1821       CkUnpackMessage(&env);
1822       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1823       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1824       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1825       PUP::fromMem p(procMsg->packData);
1826       _handleProcData(p,CmiTrue);
1827
1828       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1829       // gzheng
1830       CKLOCMGR_LOOP(mgr->startInserting(););
1831
1832       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1833       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1834       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1835       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1836
1837       _initDone();
1838       //   CpvAccess(_qd)->flushStates();
1839       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1840 #endif
1841     }
1842     //for replica, got the phase number from my neighbor processor in the current partition
1843     static void askPhaseHandler(char *msg)
1844     {
1845 #if CMK_MEM_CHECKPOINT
1846       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1847       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1848       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1849       CmiSetHandler(msg, recvPhaseHandlerIdx);
1850       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1851 #endif
1852     }
1853     // called on its backup processor
1854     // get backup message buffer and sent to crashed processor
1855     static void askProcDataHandler(char *msg)
1856     {
1857 #if CMK_MEM_CHECKPOINT
1858       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1859       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1860       if (CpvAccess(procChkptBuf) == NULL)  {
1861         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1862         CkAbort("no checkpoint found");
1863       }
1864       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1865       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1866
1867       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1868
1869       CkPackMessage(&env);
1870       CmiSetHandler(env, recoverProcDataHandlerIdx);
1871       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1872       CpvAccess(procChkptBuf) = NULL;
1873       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1874 #endif
1875     }
1876
1877     // called on PE 0
1878     void qd_callback(void *m)
1879     {
1880       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1881       fflush(stdout);
1882       CkFreeMsg(m);
1883       if(CmiNumPartition()==1){
1884 #ifdef CMK_SMP
1885         for(int i=0;i<CmiMyNodeSize();i++){
1886           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1887           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1888           CmiSetHandler(msg, askProcDataHandlerIdx);
1889           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1890           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1891         }
1892         return;
1893 #endif
1894         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1895         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1896         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1897         CmiSetHandler(msg, askProcDataHandlerIdx);
1898         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1899         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1900       }
1901       else{
1902         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1903         CmiSetHandler(msg, recvPhaseHandlerIdx);
1904         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1905       }
1906     }
1907
1908     // on crashed node
1909     void CkMemRestart(const char *dummy, CkArgMsg *args)
1910     {
1911 #if CMK_MEM_CHECKPOINT
1912       _diePE = CmiMyNode();
1913       CpvAccess( _crashedNode )= CmiMyNode();
1914       CkMemCheckPT::inRestarting = 1;
1915       _discard_charm_message();
1916
1917       /*if(CmiMyRank()==0){
1918         CkCallback cb(qd_callback);
1919         CkStartQD(cb);
1920         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1921         }*/
1922       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1923       if(CmiNumPartition()==1){
1924         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1925         restartT = CmiWallTimer();
1926         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1927         fflush(stdout);
1928         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1929         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1930         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1931         CmiSetHandler(msg, askProcDataHandlerIdx);
1932         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1933         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1934       }
1935       else{
1936         CkCallback cb(qd_callback);
1937         CkStartQD(cb);
1938       }
1939 #else
1940       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1941 #endif
1942     }
1943
1944     // can be called in other files
1945     // return true if it is in restarting
1946     extern "C"
1947       int CkInRestarting()
1948       {
1949 #if CMK_MEM_CHECKPOINT
1950         if (CpvAccess( _crashedNode)!=-1) return 1;
1951         // gzheng
1952         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1953         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1954         return CkMemCheckPT::inRestarting;
1955 #else
1956         return 0;
1957 #endif
1958       }
1959
1960     extern "C"
1961       int CkReplicaAlive()
1962       {
1963         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1964         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1965         return CkMemCheckPT::replicaAlive;
1966
1967         /*if(CkMemCheckPT::replicaDead==1)
1968           return 0;
1969           else          
1970           return 1;*/
1971       }
1972
1973     extern "C"
1974       int CkInCheckpointing()
1975       {
1976         return CkMemCheckPT::inCheckpointing;
1977       }
1978
1979     extern "C"
1980       void CkSetInLdb(){
1981 #if CMK_MEM_CHECKPOINT
1982         CkMemCheckPT::inLoadbalancing = 1;
1983 #endif
1984       }
1985
1986     extern "C"
1987       int CkInLdb(){
1988 #if CMK_MEM_CHECKPOINT
1989         return CkMemCheckPT::inLoadbalancing;
1990 #endif
1991         return 0;
1992       }
1993
1994     extern "C"
1995       void CkResetInLdb(){
1996 #if CMK_MEM_CHECKPOINT
1997         CkMemCheckPT::inLoadbalancing = 0;
1998 #endif
1999       }
2000
2001     /*****************************************************************************
2002       module initialization
2003      *****************************************************************************/
2004
2005     static int arg_where = CkCheckPoint_inMEM;
2006
2007 #if CMK_MEM_CHECKPOINT
2008     void init_memcheckpt(char **argv)
2009     {
2010       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2011         arg_where = CkCheckPoint_inDISK;
2012       }
2013       // initiliazing _crashedNode variable
2014       CpvInitialize(int, _crashedNode);
2015       CpvInitialize(int, _remoteCrashedNode);
2016       CpvAccess(_crashedNode) = -1;
2017       CpvAccess(_remoteCrashedNode) = -1;
2018       init_FI(argv);
2019     }
2020 #endif
2021
2022     class CkMemCheckPTInit: public Chare {
2023       public:
2024         CkMemCheckPTInit(CkArgMsg *m) {
2025 #if CMK_MEM_CHECKPOINT
2026           if (arg_where == CkCheckPoint_inDISK) {
2027             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2028           }
2029           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2030           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2031 #endif
2032         }
2033     };
2034
2035     static void notifyHandler(char *msg)
2036     {
2037 #if CMK_MEM_CHECKPOINT
2038       CmiFree(msg);
2039       /* immediately increase restart phase to filter old messages */
2040       CpvAccess(_curRestartPhase) ++;
2041       CpvAccess(_qd)->flushStates();
2042       _discard_charm_message();
2043
2044 #endif
2045     }
2046
2047     extern "C"
2048       void notify_crash(int node)
2049       {
2050 #ifdef CMK_MEM_CHECKPOINT
2051         CpvAccess( _crashedNode) = node;
2052 #ifdef CMK_SMP
2053         for(int i=0;i<CkMyNodeSize();i++){
2054           CpvAccessOther(_crashedNode,i)=node;
2055         }
2056 #endif
2057         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2058         CkMemCheckPT::inRestarting = 1;
2059
2060         // this may be in interrupt handler, send a message to reset QD
2061         int pe = CmiNodeFirst(CkMyNode());
2062         for(int i=0;i<CkMyNodeSize();i++){
2063           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2064           CmiSetHandler(msg, notifyHandlerIdx);
2065           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2066         }
2067 #endif
2068       }
2069
2070     extern "C" void (*notify_crash_fn)(int node);
2071
2072
2073 #if CMK_CONVERSE_MPI
2074     void buddyDieHandler(char *msg)
2075     {
2076 #if CMK_MEM_CHECKPOINT
2077       // notify
2078       CkMemCheckPT::inRestarting = 1;
2079       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2080       notify_crash(diepe);
2081       // send message to crash pe to let it restart
2082       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2083       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2084       int buddy = obj->BuddyPE(CmiMyPe());
2085       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2086       if (buddy == diepe)  {
2087         mpi_restart_crashed(diepe, newrank);
2088       }
2089 #endif
2090     }
2091
2092     void pingHandler(void *msg)
2093     {
2094       lastPingTime = CmiWallTimer();
2095       CmiFree(msg);
2096     }
2097
2098     void pingCheckHandler()
2099     {
2100 #if CMK_MEM_CHECKPOINT
2101       double now = CmiWallTimer();
2102       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2103         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2104         int i, pe, buddy;
2105         // tell everyone the buddy dies
2106         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2107         for (i = 1; i < CmiNumPes(); i++) {
2108           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2109           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2110         }
2111         buddy = pe;
2112         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2113         fflush(stdout);
2114         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2115         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2116         CmiSetHandler(msg, buddyDieHandlerIdx);
2117         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2118         //send to everyone in the other world
2119         if(CmiNumPartition()!=1){
2120           for(int i=0;i<CmiNumPes();i++){
2121             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2122             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2123             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2124             //fflush(stdout);
2125             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2126             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2127           }
2128         }
2129       }
2130         else 
2131           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2132 #endif
2133       }
2134
2135       void pingBuddy()
2136       {
2137 #if CMK_MEM_CHECKPOINT
2138         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2139         if (obj) {
2140           int buddy = obj->BuddyPE(CkMyPe());
2141           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2142           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2143           CmiSetHandler(msg, pingHandlerIdx);
2144           CmiGetRestartPhase(msg) = 9999;
2145           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2146         }
2147         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2148 #endif
2149       }
2150 #endif
2151
2152       // initproc
2153       void CkRegisterRestartHandler( )
2154       {
2155 #if CMK_MEM_CHECKPOINT
2156         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2157         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2158         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2159         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2160         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2161
2162         //for replica
2163         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2164         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2165         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2166         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2167         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2168         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2169         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2170         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2171         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2172
2173 #if CMK_CONVERSE_MPI
2174         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2175         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2176         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2177 #endif
2178
2179         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2180         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2181         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2182         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2183         CpvInitialize(int,curPointer);
2184         CpvInitialize(int,recvdLocal);
2185         CpvInitialize(int,localChkpDone);
2186         CpvInitialize(int,remoteChkpDone);
2187         CpvInitialize(int,recvdRemote);
2188         CpvInitialize(int,recvdProcChkp);
2189         CpvInitialize(int,localChecksum);
2190         CpvInitialize(int,remoteChecksum);
2191         CpvInitialize(int,recvdArrayChkp);
2192
2193         CpvAccess(procChkptBuf) = NULL;
2194         CpvAccess(buddyBuf) = NULL;
2195         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2196         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2197         CpvAccess(chkpBuf)[0] = NULL;
2198         CpvAccess(chkpBuf)[1] = NULL;
2199         CpvAccess(localProcChkpBuf)[0] = NULL;
2200         CpvAccess(localProcChkpBuf)[1] = NULL;
2201
2202         CpvAccess(curPointer) = 0;
2203         CpvAccess(recvdLocal) = 0;
2204         CpvAccess(localChkpDone) = 0;
2205         CpvAccess(remoteChkpDone) = 0;
2206         CpvAccess(recvdRemote) = 0;
2207         CpvAccess(recvdProcChkp) = 0;
2208         CpvAccess(localChecksum) = 0;
2209         CpvAccess(remoteChecksum) = 0;
2210         CpvAccess(recvdArrayChkp) = 0;
2211
2212         notify_crash_fn = notify_crash;
2213
2214 #if ! CMK_CONVERSE_MPI
2215         // print pid to kill
2216         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2217         //  sleep(4);
2218 #endif
2219 #endif
2220       }
2221
2222
2223       extern "C"
2224         int CkHasCheckpoints()
2225         {
2226           return checkpointed;
2227         }
2228
2229       /// @todo: the following definitions should be moved to a separate file containing
2230       // structures and functions about fault tolerance strategies
2231
2232       /**
2233        *  * @brief: function for killing a process                                             
2234        *   */
2235 #ifdef CMK_MEM_CHECKPOINT
2236 #if CMK_HAS_GETPID
2237       void killLocal(void *_dummy,double curWallTime){
2238         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2239         if(CmiWallTimer()<killTime-1){
2240           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2241         }else{ 
2242 #if CMK_CONVERSE_MPI
2243           CkDieNow();
2244 #else 
2245           kill(getpid(),SIGKILL);                                               
2246 #endif
2247         }              
2248       } 
2249 #else
2250       void killLocal(void *_dummy,double curWallTime){
2251         CmiAbort("kill() not supported!");
2252       }
2253 #endif
2254 #endif
2255
2256 #ifdef CMK_MEM_CHECKPOINT
2257       /**
2258        * @brief: reads the file with the kill information
2259        */
2260       void readKillFile(){
2261         FILE *fp=fopen(killFile,"r");
2262         if(!fp){
2263           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2264           return;
2265         }
2266         int proc;
2267         double sec;
2268         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2269           if(proc == CkMyNode() && CkMyRank() == 0){
2270             killTime = CmiWallTimer()+sec;
2271             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2272             CcdCallFnAfter(killLocal,NULL,sec*1000);
2273           }
2274         }
2275         fclose(fp);
2276       }
2277
2278 #if ! CMK_CONVERSE_MPI
2279       void CkDieNow()
2280       {
2281 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2282         // ignored for non-mpi version
2283         CmiPrintf("[%d] die now.\n", CmiMyPe());
2284         killTime = CmiWallTimer()+0.001;
2285         CcdCallFnAfter(killLocal,NULL,1);
2286 #endif
2287       }
2288 #endif
2289
2290 #endif
2291
2292 #include "CkMemCheckpoint.def.h"
2293
2294