9ff229155e9b7b5d374e63a4d81c20e330c878b0
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, _remoteCrashedNode);
77
78 // static, so that it is accessible from Converse part
79 int CkMemCheckPT::inRestarting = 0;
80 int CkMemCheckPT::inCheckpointing = 0;
81 int CkMemCheckPT::replicaAlive = 1;
82 int CkMemCheckPT::inLoadbalancing = 0;
83 double CkMemCheckPT::startTime;
84 char *CkMemCheckPT::stage;
85 CkCallback CkMemCheckPT::cpCallback;
86
87 int _memChkptOn = 1;                    // checkpoint is on or off
88
89 CkGroupID ckCheckPTGroupID;             // readonly
90
91 static int checkpointed = 0;
92
93 /// @todo the following declarations should be moved into a separate file for all 
94 // fault tolerant strategies
95
96 #ifdef CMK_MEM_CHECKPOINT
97 // name of the kill file that contains processes to be killed 
98 char *killFile;                                               
99 // flag for the kill file         
100 int killFlag=0;
101 // variable for storing the killing time
102 double killTime=0.0;
103 #endif
104
105 #ifdef CKLOCMGR_LOOP
106 #undef CKLOCMGR_LOOP
107 #endif
108 // loop over all CkLocMgr and do "code"
109 #define  CKLOCMGR_LOOP(code)    {       \
110   int numGroups = CkpvAccess(_groupIDTable)->size();    \
111   for(int i=0;i<numGroups;i++) {        \
112     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
113     if(obj->isLocMgr())  {      \
114       CkLocMgr *mgr = (CkLocMgr*)obj;   \
115       code      \
116     }   \
117   }     \
118 }
119
120 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
121 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
122 CpvDeclare(CkCheckPTMessage**, chkpBuf);
123 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
124 //store the checkpoint of the buddy to compare
125 //do not need the whole msg, can be the checksum
126 CpvDeclare(CkCheckPTMessage*, buddyBuf);
127 //pointer of the checkpoint going to be written
128 CpvDeclare(int, curPointer);
129 CpvDeclare(int, recvdRemote);
130 CpvDeclare(int, recvdLocal);
131 CpvDeclare(int, localChkpDone);
132 CpvDeclare(int, remoteChkpDone);
133 CpvDeclare(int, recvdArrayChkp);
134 CpvDeclare(int, recvdProcChkp);
135 CpvDeclare(int, localChecksum);
136 CpvDeclare(int, remoteChecksum);
137
138 bool compare(char * buf1, char * buf2);
139 int getChecksum(char * buf);
140 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
141 // Converse function handles
142 static int askPhaseHandlerIdx;
143 static int recvPhaseHandlerIdx;
144 static int askProcDataHandlerIdx;
145 static int restartBcastHandlerIdx;
146 static int recoverProcDataHandlerIdx;
147 static int restartBeginHandlerIdx;
148 static int recvRemoteChkpHandlerIdx;
149 static int replicaDieHandlerIdx;
150 static int replicaDieBcastHandlerIdx;
151 static int replicaRecoverHandlerIdx;
152 static int replicaChkpDoneHandlerIdx;
153 static int recoverRemoteProcDataHandlerIdx;
154 static int recoverRemoteArrayDataHandlerIdx;
155 static int notifyHandlerIdx;
156 // compute the backup processor
157 // FIXME: avoid crashed processors
158 #if CMK_CONVERSE_MPI
159 static int pingHandlerIdx;
160 static int pingCheckHandlerIdx;
161 static int buddyDieHandlerIdx;
162 static double lastPingTime = -1;
163
164 extern "C" void mpi_restart_crashed(int pe, int rank);
165 extern "C" int  find_spare_mpirank(int pe,int partition);
166
167 void pingBuddy();
168 void pingCheckHandler();
169
170 #endif
171 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
172
173 inline int CkMemCheckPT::BuddyPE(int pe)
174 {
175   int budpe;
176 #if NODE_CHECKPOINT
177   // buddy is the processor with same rank on the next physical node
178   int r1 = CmiPhysicalRank(pe);
179   int budnode = CmiPhysicalNodeID(pe);
180   do {
181     budnode = (budnode+1)%CmiNumPhysicalNodes();
182     int *pelist;
183     int num;
184     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
185     budpe = pelist[r1 % num];
186   } while (isFailed(budpe));
187   if (budpe == pe) {
188     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
189     CmiAbort("Failed to find a buddy processor");
190   }
191 #else
192   budpe = pe;
193   budpe = (budpe+1)%CkNumPes();
194 #endif
195   return budpe;
196 }
197
198 // called in array element constructor
199 // choose and register with 2 buddies for checkpoiting 
200 #if CMK_MEM_CHECKPOINT
201 void ArrayElement::init_checkpt() {
202   if (_memChkptOn == 0) return;
203   if (CkInRestarting()) {
204     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
205   }
206   // only master init checkpoint
207   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
208
209   budPEs[0] = CkMyPe();
210   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
211   CmiAssert(budPEs[0] != budPEs[1]);
212   // inform checkPTMgr
213   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
214   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
215   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
216   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
217 }
218 #endif
219
220 // entry function invoked by checkpoint mgr asking for checkpoint data
221 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
222 #if CMK_MEM_CHECKPOINT
223   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
224   //char index[128];   thisIndexMax.sprint(index);
225   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
226   CkLocMgr *locMgr = thisArray->getLocMgr();
227   CmiAssert(myRec!=NULL);
228   int size;
229   {
230     PUP::sizer p;
231     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
232     size = p.size();
233   }
234   int packSize = size/sizeof(double) +1;
235   CkArrayCheckPTMessage *msg =
236     new (packSize, 0) CkArrayCheckPTMessage;
237   msg->len = size;
238   msg->index =thisIndexMax;
239   msg->aid = thisArrayID;
240   msg->locMgr = locMgr->getGroupID();
241   msg->cp_flag = 1;
242   {
243     PUP::toMem p(msg->packData);
244     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
245   }
246
247   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
248   checkptMgr.recvData(msg, 2, budPEs);
249   delete m;
250 #endif
251 }
252
253 // checkpoint holder class - for memory checkpointing
254 class CkMemCheckPTInfo: public CkCheckPTInfo
255 {
256   CkArrayCheckPTMessage *ckBuffer;
257   public:
258   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
259     CkCheckPTInfo(a, loc, idx, pno)
260   {
261     ckBuffer = NULL;
262   }
263   ~CkMemCheckPTInfo() 
264   {
265     if (ckBuffer) delete ckBuffer; 
266   }
267   inline void updateBuffer(CkArrayCheckPTMessage *data) 
268   {
269     CmiAssert(data!=NULL);
270     if (ckBuffer) delete ckBuffer;
271     ckBuffer = data;
272   }    
273   inline CkArrayCheckPTMessage * getCopy()
274   {
275     if (ckBuffer == NULL) {
276       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
277       CmiAbort("Abort!");
278     }
279     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
280   }     
281   inline void updateBuddy(int b1, int b2) {
282     CmiAssert(ckBuffer);
283     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
284     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
285     CmiAssert(pNo != CkMyPe());
286   }
287   inline int getSize() { 
288     CmiAssert(ckBuffer);
289     return ckBuffer->len; 
290   }
291 };
292
293 // checkpoint holder class - for in-disk checkpointing
294 class CkDiskCheckPTInfo: public CkCheckPTInfo 
295 {
296   char *fname;
297   int bud1, bud2;
298   int len;                      // checkpoint size
299   public:
300   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
301   {
302 #if CMK_USE_MKSTEMP
303     fname = new char[64];
304     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
305     mkstemp(fname);
306 #else
307     fname=tmpnam(NULL);
308 #endif
309     bud1 = bud2 = -1;
310     len = 0;
311   }
312   ~CkDiskCheckPTInfo() 
313   {
314     remove(fname);
315   }
316   inline void updateBuffer(CkArrayCheckPTMessage *data) 
317   {
318     double t = CmiWallTimer();
319     // unpack it
320     envelope *env = UsrToEnv(data);
321     CkUnpackMessage(&env);
322     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
323     FILE *f = fopen(fname,"wb");
324     PUP::toDisk p(f);
325     CkPupMessage(p, (void **)&data);
326     // delay sync to the end because otherwise the messages are blocked
327     //    fsync(fileno(f));
328     fclose(f);
329     bud1 = data->bud1;
330     bud2 = data->bud2;
331     len = data->len;
332     delete data;
333     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
334   }
335   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
336   {
337     CkArrayCheckPTMessage *data;
338     FILE *f = fopen(fname,"rb");
339     PUP::fromDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     fclose(f);
342     data->bud1 = bud1;                          // update the buddies
343     data->bud2 = bud2;
344     return data;
345   }
346   inline void updateBuddy(int b1, int b2) {
347     bud1 = b1; bud2 = b2;
348     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
349     CmiAssert(pNo != CkMyPe());
350   }
351   inline int getSize() { 
352     return len; 
353   }
354 };
355
356 CkMemCheckPT::CkMemCheckPT(int w)
357 {
358   int numnodes = 0;
359 #if NODE_CHECKPOINT
360   numnodes = CmiNumPhysicalNodes();
361 #else
362   numnodes = CkNumPes();
363 #endif
364 #if CK_NO_PROC_POOL
365   if (numnodes <= 2)
366 #else
367     if (numnodes  == 1)
368 #endif
369     {
370       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
371       _memChkptOn = 0;
372     }
373   inRestarting = 0;
374   inCheckpointing = 0;
375   recvCount = peCount = 0;
376   ackCount = 0;
377   expectCount = -1;
378   where = w;
379   replicaAlive = 1;
380   notifyReplica = 0;
381 #if CMK_CONVERSE_MPI
382   void pingBuddy();
383   void pingCheckHandler();
384   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
385   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
386 #endif
387   chkpTable[0] = NULL;
388   chkpTable[1] = NULL;
389   maxIter = -1;
390   recvIterCount = 0;
391   localDecided = false;
392 }
393
394 CkMemCheckPT::~CkMemCheckPT()
395 {
396   int len = ckTable.length();
397   for (int i=0; i<len; i++) {
398     delete ckTable[i];
399   }
400 }
401
402 void CkMemCheckPT::pup(PUP::er& p) 
403
404   CBase_CkMemCheckPT::pup(p); 
405   p|cpStarter;
406   p|thisFailedPe;
407   p|failedPes;
408   p|ckCheckPTGroupID;           // recover global variable
409   p|cpCallback;                 // store callback
410   p|where;                      // where to checkpoint
411   p|peCount;
412   if (p.isUnpacking()) {
413     recvCount = 0;
414 #if CMK_CONVERSE_MPI
415     void pingBuddy();
416     void pingCheckHandler();
417     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
418     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
419 #endif
420     maxIter = -1;
421     recvIterCount = 0;
422     localDecided = false;
423   }
424 }
425
426 void CkMemCheckPT::getIter(){
427   localDecided = true;
428   localMaxIter = maxIter+1;
429   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
430   int elemCount = CkCountChkpSyncElements();
431   if(elemCount == 0){
432     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
433   }
434 }
435
436 //when one replica failes, decide the next chkp iter
437
438 void CkMemCheckPT::recvIter(int iter){
439   if(maxIter<iter){
440     maxIter=iter;
441   }
442 }
443
444 void CkMemCheckPT::recvMaxIter(int iter){
445   localDecided = false;
446   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
447 }
448
449 void CkMemCheckPT::reachChkpIter(){
450   recvIterCount++;
451   elemCount = CkCountChkpSyncElements();
452   if(recvIterCount == elemCount){
453     recvIterCount = 0;
454     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
455   }
456 }
457
458 void CkMemCheckPT::startChkp(){
459   CkPrintf("start checkpoint\n");
460   CkStartMemCheckpoint(cpCallback);
461 }
462
463 // called by checkpoint mgr to restore an array element
464 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
465 {
466 #if CMK_MEM_CHECKPOINT
467   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
468   // m->index.print();
469   PUP::fromMem p(m->packData);
470   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
471   CmiAssert(mgr);
472 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
473   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
474 #else
475   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
476 #endif
477
478   // find a list of array elements bound together
479   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
480   CmiAssert(elt);
481   CkLocRec_local *rec = elt->myRec;
482   CkVec<CkMigratable *> list;
483   mgr->migratableList(rec, list);
484   CmiAssert(list.length() > 0);
485   for (int l=0; l<list.length(); l++) {
486     elt = (ArrayElement *)list[l];
487     elt->budPEs[0] = m->bud1;
488     elt->budPEs[1] = m->bud2;
489     //    reset, may not needed now
490     // for now.
491     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
492       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
493       if (c) c->redNo = 0;
494     }
495   }
496 #endif
497 }
498
499 // return 1 if pe is a crashed processor
500 int CkMemCheckPT::isFailed(int pe)
501 {
502   for (int i=0; i<failedPes.length(); i++)
503     if (failedPes[i] == pe) return 1;
504   return 0;
505 }
506
507 // add pe into history list of all failed processors
508 void CkMemCheckPT::failed(int pe)
509 {
510   if (isFailed(pe)) return;
511   failedPes.push_back(pe);
512 }
513
514 int CkMemCheckPT::totalFailed()
515 {
516   return failedPes.length();
517 }
518
519 // create an checkpoint entry for array element of aid with index.
520 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
521 {
522   // error check, no duplicate
523   int idx, len = ckTable.size();
524   for (idx=0; idx<len; idx++) {
525     CkCheckPTInfo *entry = ckTable[idx];
526     if (index == entry->index) {
527       if (loc == entry->locMgr) {
528         // bindTo array elements
529         return;
530       }
531       // for array inheritance, the following check may fail
532       // because ArrayElement constructor of all superclasses are called
533       if (aid == entry->aid) {
534         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
535         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
536       }
537     }
538   }
539   CkCheckPTInfo *newEntry;
540   if (where == CkCheckPoint_inMEM)
541     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
542   else
543     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
544   ckTable.push_back(newEntry);
545   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
546 }
547
548 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
549 {
550 #if !CMK_CHKP_ALL       
551   int buddy = msg->bud1;
552   if (buddy == CkMyPe()) buddy = msg->bud2;
553   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
554   recvData(msg);
555   // ack
556   thisProxy[buddy].gotData();
557 #else
558   chkpTable[0] = NULL;
559   chkpTable[1] = NULL;
560   recvArrayCheckpoint(msg);
561   thisProxy[msg->bud2].gotData();
562 #endif
563 }
564
565 // loop through my checkpoint table and ask checkpointed array elements
566 // to send me checkpoint data.
567 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
568 {
569   checkpointed = 1;
570   cpCallback = cb;
571   cpStarter = starter;
572   inCheckpointing = 1;
573   if (CkMyPe() == cpStarter) {
574     startTime = CmiWallTimer();
575     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
576   }
577 #if CMK_CONVERSE_MPI
578   if(CmiNumPartition()==1)
579 #endif    
580   {
581
582 #if !CMK_CHKP_ALL
583     int len = ckTable.length();
584     for (int i=0; i<len; i++) {
585       CkCheckPTInfo *entry = ckTable[i];
586       // always let the bigger number processor send request
587       //if (CkMyPe() < entry->pNo) continue;
588       // always let the smaller number processor send request, may on same proc
589       if (!isMaster(entry->pNo)) continue;
590       // call inmem_checkpoint to the array element, ask it to send
591       // back checkpoint data via recvData().
592       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
593       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
594     }
595     // if my table is empty, then I am done
596     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
597 #else
598     startArrayCheckpoint();
599 #endif
600     sendProcData();
601   }
602 #if CMK_CONVERSE_MPI
603   else
604   {
605     startCheckpoint();
606   }
607 #endif
608   // pack and send proc level data
609 }
610
611 class MemElementPacker : public CkLocIterator{
612   private:
613     //    CkLocMgr *locMgr;
614     PUP::er &p;
615     std::map<CkHashCode,CkLocation> arrayMap;
616   public:
617     MemElementPacker(PUP::er &p_):p(p_){};
618     void addLocation(CkLocation &loc){
619       CkArrayIndexMax idx = loc.getIndex();
620       arrayMap[idx.hash()] = loc;
621     }
622     void writeCheckpoint(){
623       std::map<CkHashCode, CkLocation>::iterator it;
624       for(it = arrayMap.begin();it!=arrayMap.end();it++){
625         if(CkMyPe()==0){
626           //                            CkPrintf("[%d][%d] pack %d\n",CmiMyPartition(),CkMyPe(),it->first);
627         }
628         CkLocation loc = it->second;
629         CkLocMgr *locMgr = loc.getManager();
630         CkArrayIndexMax idx = loc.getIndex();
631         CkGroupID gID = locMgr->ckGetGroupID();
632         p|gID;
633         p|idx;
634         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
635       }
636     }
637 };
638
639 void pupAllElements(PUP::er &p){
640 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
641   int numElements;
642   if(!p.isUnpacking()){
643     numElements = CkCountArrayElements();
644   }
645   p | numElements;
646   if(!p.isUnpacking()){
647     MemElementPacker packer(p);
648     CKLOCMGR_LOOP(mgr->iterate(packer););
649     packer.writeCheckpoint();
650   }
651 #endif
652 }
653
654 void CkMemCheckPT::startArrayCheckpoint(){
655 #if CMK_CHKP_ALL
656   int size;
657   {
658     PUP::sizer psizer;
659     pupAllElements(psizer);
660     size = psizer.size();
661   }
662   int packSize = size/sizeof(double)+1;
663   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
664   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
665   msg->len = size;
666   msg->cp_flag = 1;
667   int budPEs[2];
668   msg->bud1=CkMyPe();
669   msg->bud2=ChkptOnPe(CkMyPe());
670   {
671     PUP::toMem p(msg->packData);
672     pupAllElements(p);
673   }
674   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
675   if(chkpTable[0]) delete chkpTable[0];
676   chkpTable[0] = msg;
677   //send the checkpoint to my 
678   recvCount++;
679 #endif
680 }
681
682 void CkMemCheckPT::startCheckpoint(){
683 #if CMK_CONVERSE_MPI
684   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
685     CkPrintf("in start checkpointing!!!!\n");
686   int size;
687   {
688     PUP::sizer p;
689     _handleProcData(p,CmiFalse);
690     size = p.size();
691   }
692   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
693   procMsg->len = size;
694   procMsg->cp_flag = 1;
695   {
696     PUP::toMem p(procMsg->packData);
697     _handleProcData(p,CmiFalse);
698   }
699   int pointer = CpvAccess(curPointer);
700   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
701   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
702
703   {
704     PUP::sizer psizer;
705     pupAllElements(psizer);
706     size = psizer.size();
707   }
708   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
709   msg->len = size;
710   msg->cp_flag = 1;
711   {
712     PUP::toMem p(msg->packData);
713     pupAllElements(p);
714   }
715   pointer = CpvAccess(curPointer);
716   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
717   CpvAccess(chkpBuf)[pointer] = msg;
718   if(CkMyPe()==0)
719     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
720   if(CkReplicaAlive()==1){
721     CpvAccess(recvdLocal) = 1;
722 #if CMK_USE_CHECKSUM
723     CkCheckPTMessage * tmpMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
724     CpvAccess(localChecksum) = getChecksum((char *)(tmpMsg->packData));
725 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),CpvAccess(localChecksum));
726     delete tmpMsg;
727     //send checksum
728     char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
729     *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
730     CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
731     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
732 #else
733     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
734     CkPackMessage(&env);
735     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
736     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
737 #endif
738   }
739   if(CpvAccess(recvdRemote)==1){
740     //compare the checkpoint 
741     int size = CpvAccess(chkpBuf)[pointer]->len;
742 //    CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
743 #if CMK_USE_CHECKSUM
744     if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
745       thisProxy[CkMyPe()].doneComparison(true);
746     }
747 #else
748     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
749       thisProxy[CkMyPe()].doneComparison(true);
750     }
751 #endif
752     else{
753       //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
754       thisProxy[CkMyPe()].doneComparison(false);
755     }
756     if(CkMyPe()==0)
757       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
758   }
759   else{
760     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
761       { 
762         int pointer = CpvAccess(curPointer);
763         //send the proc data
764         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
765         procMsg->pointer = pointer;
766         envelope * env = (envelope *)(UsrToEnv(procMsg));
767         CkPackMessage(&env);
768         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
769         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
770         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
771           CkPrintf("[%d] sendProcdata\n",CkMyPe());
772         }
773       }
774       //send the array checkpoint data
775       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
776       msg->pointer = CpvAccess(curPointer);
777       envelope * env = (envelope *)(UsrToEnv(msg));
778       CkPackMessage(&env);
779       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
780       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
781       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
782         CkPrintf("[%d] sendArraydata\n",CkMyPe());
783       //can continue work, no need to wait for my replica
784     }
785   }
786 #endif
787 }
788
789 void CkMemCheckPT::doneComparison(bool ret){
790   int _ret = 1;
791   if(!ret){
792     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
793     _ret = 0;
794   }else{
795     _ret = 1;
796   }
797   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
798   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
799 }
800
801 void CkMemCheckPT::doneRComparison(int ret){
802   //    if(CpvAccess(curPointer) == 0){
803   if(ret==CkNumPes()){
804     CpvAccess(localChkpDone) = 1;
805     if(CpvAccess(remoteChkpDone) ==1){
806       thisProxy.doneBothComparison();
807     }
808     if(notifyReplica == 0){
809       //notify the replica am done
810       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
811       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
812       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
813       notifyReplica = 1;
814     }
815   }
816   else{
817     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
818     startTime = CmiWallTimer();
819     thisProxy.RollBack();
820   }
821 }
822
823 void CkMemCheckPT::doneBothComparison(){
824   CpvAccess(recvdRemote) = 0;
825   CpvAccess(recvdLocal) = 0;
826   CpvAccess(localChkpDone) = 0;
827   CpvAccess(remoteChkpDone) = 0;
828   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
829   CpvAccess(curPointer)^=1;
830   inCheckpointing = 0;
831   notifyReplica = 0;
832   if(CkMyPe() == 0){
833     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,size);
834   }
835   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
836 }
837
838 void CkMemCheckPT::RollBack(){
839   //restore group data
840   checkpointed = 0;
841   CkMemCheckPT::inRestarting = 1;
842   int pointer = CpvAccess(curPointer)^1;//use the previous one
843   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
844   PUP::fromMem p(chkpMsg->packData);    
845
846   //destroy array elements
847   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
848   int numGroups = CkpvAccess(_groupIDTable)->size();
849   for(int i=0;i<numGroups;i++) {
850     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
851     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
852     obj->flushStates();
853     obj->ckJustMigrated();
854   }
855   //restore array elements
856
857   int numElements;
858   p|numElements;
859
860   if(p.isUnpacking()){
861     for(int i=0;i<numElements;i++){
862       CkGroupID gID;
863       CkArrayIndex idx;
864       p|gID;
865       p|idx;
866       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
867       CmiAssert(mgr);
868       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
869     }
870     }
871     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
872     contribute(cb);
873   }
874
875   void CkMemCheckPT::notifyReplicaDie(int pe){
876     replicaAlive = 0;
877     CpvAccess(_remoteCrashedNode) = pe;
878   }
879
880   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
881   {
882 #if CMK_CHKP_ALL
883     int idx = 1;
884     if(msg->bud1 == CkMyPe()){
885       idx = 0;
886     }
887     int isChkpting = msg->cp_flag;
888     if(isChkpting == 1){
889       if(chkpTable[idx]) delete chkpTable[idx];
890     }
891     chkpTable[idx] = msg;
892     if(isChkpting){
893       recvCount++;
894       if(recvCount == 2){
895         if (where == CkCheckPoint_inMEM) {
896           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
897         }
898         else if (where == CkCheckPoint_inDISK) {
899           // another barrier for finalize the writing using fsync
900           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
901           contribute(0,NULL,CkReduction::sum_int,localcb);
902         }
903         else
904           CmiAbort("Unknown checkpoint scheme");
905         recvCount = 0;
906       }
907     }
908 #endif
909   }
910
911   // don't handle array elements
912   static inline void _handleProcData(PUP::er &p, CmiBool create)
913   {
914     // save readonlys, and callback BTW
915     CkPupROData(p);
916
917     // save mainchares 
918     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
919
920 #ifndef CMK_CHARE_USE_PTR
921     // save non-migratable chare
922     CkPupChareData(p);
923 #endif
924
925     // save groups into Groups.dat
926     CkPupGroupData(p,create);
927
928     // save nodegroups into NodeGroups.dat
929     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
930   }
931
932   void CkMemCheckPT::sendProcData()
933   {
934     // find out size of buffer
935     int size;
936     {
937       PUP::sizer p;
938       _handleProcData(p,CmiTrue);
939       size = p.size();
940     }
941     int packSize = size;
942     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
943     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
944     {
945       PUP::toMem p(msg->packData);
946       _handleProcData(p,CmiTrue);
947     }
948     msg->pe = CkMyPe();
949     msg->len = size;
950     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
951     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
952   }
953
954   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
955   {
956     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
957     CpvAccess(procChkptBuf) = msg;
958     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
959     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
960   }
961
962   // ArrayElement call this function to give us the checkpointed data
963   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
964   {
965     int len = ckTable.length();
966     int idx;
967     for (idx=0; idx<len; idx++) {
968       CkCheckPTInfo *entry = ckTable[idx];
969       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
970     }
971     CkAssert(idx < len);
972     int isChkpting = msg->cp_flag;
973     ckTable[idx]->updateBuffer(msg);
974     if (isChkpting) {
975       // all my array elements have returned their inmem data
976       // inform starter processor that I am done.
977       recvCount ++;
978       if (recvCount == ckTable.length()) {
979         if (where == CkCheckPoint_inMEM) {
980           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
981         }
982         else if (where == CkCheckPoint_inDISK) {
983           // another barrier for finalize the writing using fsync
984           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
985           contribute(0,NULL,CkReduction::sum_int,localcb);
986         }
987         else
988           CmiAbort("Unknown checkpoint scheme");
989         recvCount = 0;
990       } 
991     }
992   }
993
994   // only used in disk checkpointing
995   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
996   {
997     delete m;
998 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
999     system("sync");
1000 #endif
1001     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1002   }
1003
1004   // only is called on cpStarter when checkpoint is done
1005   void CkMemCheckPT::cpFinish()
1006   {
1007     CmiAssert(CkMyPe() == cpStarter);
1008     peCount++;
1009     // now that all processors have finished, activate callback
1010     if (peCount == 2) 
1011     {
1012       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1013       peCount = 0;
1014       thisProxy.report();
1015     }
1016   }
1017
1018   // for debugging, report checkpoint info
1019   void CkMemCheckPT::report()
1020   {
1021     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1022     inCheckpointing = 0;
1023 #if !CMK_CHKP_ALL
1024     int objsize = 0;
1025     int len = ckTable.length();
1026     for (int i=0; i<len; i++) {
1027       CkCheckPTInfo *entry = ckTable[i];
1028       CmiAssert(entry);
1029       objsize += entry->getSize();
1030     }
1031     CmiAssert(CpvAccess(procChkptBuf));
1032     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1033 #else
1034     if(CkMyPe()==0)
1035       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1036 #endif
1037   }
1038
1039   /*****************************************************************************
1040     RESTART Procedure
1041    *****************************************************************************/
1042
1043   // master processor of two buddies
1044   inline int CkMemCheckPT::isMaster(int buddype)
1045   {
1046 #if 0
1047     int mype = CkMyPe();
1048     //CkPrintf("ismaster: %d %d\n", pe, mype);
1049     if (CkNumPes() - totalFailed() == 2) {
1050       return mype > buddype;
1051     }
1052     for (int i=1; i<CkNumPes(); i++) {
1053       int me = (buddype+i)%CkNumPes();
1054       if (isFailed(me)) continue;
1055       if (me == mype) return 1;
1056       else return 0;
1057     }
1058     return 0;
1059 #else
1060     // smaller one
1061     int mype = CkMyPe();
1062     //CkPrintf("ismaster: %d %d\n", pe, mype);
1063     if (CkNumPes() - totalFailed() == 2) {
1064       return mype < buddype;
1065     }
1066 #if NODE_CHECKPOINT
1067     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1068     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1069 #else
1070       for (int i=1; i<CkNumPes(); i++) {
1071 #endif
1072         int me = (mype+i)%CkNumPes();
1073         if (isFailed(me)) continue;
1074         if (me == buddype) return 1;
1075         else return 0;
1076       }
1077       return 0;
1078 #endif
1079     }
1080
1081
1082
1083 #if 0
1084     // helper class to pup all elements that belong to same ckLocMgr
1085     class ElementDestoryer : public CkLocIterator {
1086       private:
1087         CkLocMgr *locMgr;
1088       public:
1089         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1090         void addLocation(CkLocation &loc) {
1091           CkArrayIndex idx=loc.getIndex();
1092           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1093           loc.destroy();
1094         }
1095     };
1096 #endif
1097
1098     // restore the bitmap vector for LB
1099     void CkMemCheckPT::resetLB(int diepe)
1100     {
1101 #if CMK_LBDB_ON
1102       int i;
1103       char *bitmap = new char[CkNumPes()];
1104       // set processor available bitmap
1105       get_avail_vector(bitmap);
1106       for (i=0; i<failedPes.length(); i++)
1107         bitmap[failedPes[i]] = 0; 
1108       bitmap[diepe] = 0;
1109
1110 #if CK_NO_PROC_POOL
1111       set_avail_vector(bitmap);
1112 #endif
1113
1114       // if I am the crashed pe, rebuild my failedPEs array
1115       if (CkMyNode() == diepe)
1116         for (i=0; i<CkNumPes(); i++) 
1117           if (bitmap[i]==0) failed(i);
1118
1119       delete [] bitmap;
1120 #endif
1121     }
1122
1123     static double restartT;
1124
1125     // in case when failedPe dies, everybody go through its checkpoint table:
1126     // destory all array elements
1127     // recover lost buddies
1128     // reconstruct all array elements from check point data
1129     // called on all processors
1130     void CkMemCheckPT::restart(int diePe)
1131     {
1132 #if CMK_MEM_CHECKPOINT
1133       double curTime = CmiWallTimer();
1134       if (CkMyPe() == diePe){
1135         restartT = CmiWallTimer();
1136         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1137       }
1138       stage = (char*)"resetLB";
1139       startTime = curTime;
1140       if (CkMyPe() == diePe)
1141         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1142
1143 #if CK_NO_PROC_POOL
1144       failed(diePe);    // add into the list of failed pes
1145 #endif
1146       thisFailedPe = diePe;
1147
1148       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1149
1150       inRestarting = 1;
1151
1152       // disable load balancer's barrier
1153       //if (CkMyPe() != diePe) resetLB(diePe);
1154
1155       CKLOCMGR_LOOP(mgr->startInserting(););
1156
1157
1158       if(CmiNumPartition()==1){
1159         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1160       }else{
1161         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1162         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1163       }
1164 #endif
1165     }
1166
1167     // loally remove all array elements
1168     void CkMemCheckPT::removeArrayElements()
1169     {
1170 #if CMK_MEM_CHECKPOINT
1171       int len = ckTable.length();
1172       double curTime = CmiWallTimer();
1173       if (CkMyPe() == thisFailedPe) 
1174         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1175       stage = (char*)"removeArrayElements";
1176       startTime = curTime;
1177
1178       //  if (cpCallback.isInvalid()) 
1179       //          CkPrintf("invalid pe %d\n",CkMyPe());
1180       //          CkAbort("Didn't set restart callback\n");;
1181       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1182
1183       // get rid of all buffering and remote recs
1184       // including destorying all array elements
1185 #if CK_NO_PROC_POOL  
1186       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1187 #else
1188       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1189 #endif
1190       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1191 #endif
1192     }
1193
1194     // flush state in reduction manager
1195     void CkMemCheckPT::resetReductionMgr()
1196     {
1197       if (CkMyPe() == thisFailedPe) 
1198         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1199       int numGroups = CkpvAccess(_groupIDTable)->size();
1200       for(int i=0;i<numGroups;i++) {
1201         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1202         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1203         obj->flushStates();
1204         obj->ckJustMigrated();
1205       }
1206       // reset again
1207       //CpvAccess(_qd)->flushStates();
1208       if(CmiNumPartition()==1){
1209         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1210       }
1211       else
1212         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1213     }
1214
1215     // recover the lost buddies
1216     void CkMemCheckPT::recoverBuddies()
1217     {
1218       int idx;
1219       int len = ckTable.length();
1220       // ready to flush reduction manager
1221       // cannot be CkMemCheckPT::restart because destory will modify states
1222       double curTime = CmiWallTimer();
1223       if (CkMyPe() == thisFailedPe)
1224         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1225       stage = (char *)"recoverBuddies";
1226       if (CkMyPe() == thisFailedPe)
1227         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1228       startTime = curTime;
1229
1230       // recover buddies
1231       expectCount = 0;
1232 #if !CMK_CHKP_ALL
1233       for (idx=0; idx<len; idx++) {
1234         CkCheckPTInfo *entry = ckTable[idx];
1235         if (entry->pNo == thisFailedPe) {
1236 #if CK_NO_PROC_POOL
1237           // find a new buddy
1238           int budPe = BuddyPE(CkMyPe());
1239 #else
1240           int budPe = thisFailedPe;
1241 #endif
1242           CkArrayCheckPTMessage *msg = entry->getCopy();
1243           msg->bud1 = budPe;
1244           msg->bud2 = CkMyPe();
1245           msg->cp_flag = 0;            // not checkpointing
1246           thisProxy[budPe].recoverEntry(msg);
1247           expectCount ++;
1248         }
1249       }
1250 #else
1251       //send to failed pe
1252       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1253 #if CK_NO_PROC_POOL
1254         // find a new buddy
1255         int budPe = BuddyPE(CkMyPe());
1256 #else
1257         int budPe = thisFailedPe;
1258 #endif
1259         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1260         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1261         msg->cp_flag = 0;            // not checkpointing
1262         msg->bud1 = budPe;
1263         msg->bud2 = CkMyPe();
1264         thisProxy[budPe].recoverEntry(msg);
1265         expectCount ++;
1266       }
1267 #endif
1268
1269       if (expectCount == 0) {
1270         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1271       }
1272       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1273     }
1274
1275     void CkMemCheckPT::gotData()
1276     {
1277       ackCount ++;
1278       if (ackCount == expectCount) {
1279         ackCount = 0;
1280         expectCount = -1;
1281         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1282         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1283       }
1284     }
1285
1286     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1287     {
1288
1289       for (int i=0; i<n; i++) {
1290         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1291         mgr->updateLocation(idx[i], nowOnPe);
1292       }
1293       thisProxy[nowOnPe].gotReply();
1294     }
1295
1296     // restore array elements
1297     void CkMemCheckPT::recoverArrayElements()
1298     {
1299       double curTime = CmiWallTimer();
1300       int len = ckTable.length();
1301       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1302       stage = (char *)"recoverArrayElements";
1303       if (CkMyPe() == thisFailedPe)
1304         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1305       startTime = curTime;
1306       int flag = 0;
1307       // recover all array elements
1308       int count = 0;
1309
1310 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1311       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1312       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1313 #endif
1314
1315 #if !CMK_CHKP_ALL
1316       for (int idx=0; idx<len; idx++)
1317       {
1318         CkCheckPTInfo *entry = ckTable[idx];
1319 #if CK_NO_PROC_POOL
1320         // the bigger one will do 
1321         //    if (CkMyPe() < entry->pNo) continue;
1322         if (!isMaster(entry->pNo)) continue;
1323 #else
1324         // smaller one do it, which has the original object
1325         if (CkMyPe() == entry->pNo+1 || 
1326             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1327 #endif
1328         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1329
1330         entry->updateBuddy(CkMyPe(), entry->pNo);
1331         CkArrayCheckPTMessage *msg = entry->getCopy();
1332         // gzheng
1333         //thisProxy[CkMyPe()].inmem_restore(msg);
1334         inmem_restore(msg);
1335 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1336         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1337         int homePe = mgr->homePe(msg->index);
1338         if (homePe != CkMyPe()) {
1339           gmap[homePe].push_back(msg->locMgr);
1340           imap[homePe].push_back(msg->index);
1341         }
1342 #endif
1343         CkFreeMsg(msg);
1344         count ++;
1345       }
1346 #else
1347       char * packData;
1348       if(CmiNumPartition()==1){
1349         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1350         packData = (char *)msg->packData;
1351       }
1352       else{
1353         int pointer = CpvAccess(curPointer);
1354         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1355         packData = msg->packData;
1356       }
1357 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1358       recoverAll(packData,gmap,imap);
1359 #else
1360       recoverAll(packData);
1361 #endif
1362 #endif
1363 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1364       for (int i=0; i<CkNumPes(); i++) {
1365         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1366           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1367           flag++;       
1368         }
1369       }
1370       delete [] imap;
1371       delete [] gmap;
1372 #endif
1373       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1374
1375       CKLOCMGR_LOOP(mgr->doneInserting(););
1376
1377       // _crashedNode = -1;
1378       CpvAccess(_crashedNode) = -1;
1379 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1380       if (CkMyPe() == 0)
1381         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1382 #else
1383       if(flag == 0)
1384       {
1385         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1386       }
1387 #endif
1388     }
1389
1390     void CkMemCheckPT::gotReply(){
1391       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1392     }
1393
1394     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1395 #if CMK_CHKP_ALL
1396       PUP::fromMem p(packData);
1397       int numElements;
1398       p|numElements;
1399       if(p.isUnpacking()){
1400         for(int i=0;i<numElements;i++){
1401           CkGroupID gID;
1402           CkArrayIndex idx;
1403           p|gID;
1404           p|idx;
1405           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1406           int homePe = mgr->homePe(idx);
1407 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1408           mgr->resume(idx,p,CmiTrue,CmiTrue);
1409 #else
1410           if(CmiNumPartition()==1)
1411             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1412           else{
1413             if(CkMyPe()==thisFailedPe){
1414               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1415             }
1416             else{
1417               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1418             }
1419           }
1420 #endif
1421 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1422           homePe = mgr->homePe(idx);
1423           if (homePe != CkMyPe()) {
1424             gmap[homePe].push_back(gID);
1425             imap[homePe].push_back(idx);
1426           }
1427 #endif
1428         }
1429       }
1430 #endif
1431     }
1432
1433
1434     // on every processor
1435     // turn load balancer back on
1436     void CkMemCheckPT::finishUp()
1437     {
1438       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1439       //CKLOCMGR_LOOP(mgr->doneInserting(););
1440
1441       if (CkMyPe() == thisFailedPe)
1442       {
1443         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1444         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1445         fflush(stdout);
1446       }
1447       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1448       inRestarting = 0;
1449
1450 #if CMK_CONVERSE_MPI    
1451       if(CmiNumPartition()!=1){
1452         CpvAccess(recvdProcChkp) = 0;
1453         CpvAccess(recvdArrayChkp) = 0;
1454         CpvAccess(curPointer)^=1;
1455         //notify my replica, restart is done
1456         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1457         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1458         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1459       }
1460       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1461         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1462       }
1463 #endif
1464
1465 #if CK_NO_PROC_POOL
1466 #if NODE_CHECKPOINT
1467       int numnodes = CmiNumPhysicalNodes();
1468 #else
1469       int numnodes = CkNumPes();
1470 #endif
1471       if (numnodes-totalFailed() <=2) {
1472         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1473         _memChkptOn = 0;
1474       }
1475 #endif
1476     }
1477
1478     void CkMemCheckPT::recoverFromSoftFailure()
1479     {
1480       inRestarting = 0;
1481       CpvAccess(recvdRemote) = 0;
1482       CpvAccess(recvdLocal) = 0;
1483       CpvAccess(localChkpDone) = 0;
1484       CpvAccess(remoteChkpDone) = 0;
1485       inCheckpointing = 0;
1486       notifyReplica = 0;
1487       if(CkMyPe() == 0){
1488         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1489       }
1490       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1491     }
1492     // called only on 0
1493     void CkMemCheckPT::quiescence(CkCallback &cb)
1494     {
1495       static int pe_count = 0;
1496       pe_count ++;
1497       CmiAssert(CkMyPe() == 0);
1498       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1499       if (pe_count == CkNumPes()) {
1500         pe_count = 0;
1501         cb.send();
1502       }
1503     }
1504
1505     // User callable function - to start a checkpoint
1506     // callback cb is used to pass control back
1507     void CkStartMemCheckpoint(CkCallback &cb)
1508     {
1509 #if CMK_MEM_CHECKPOINT
1510       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1511       /*if (_memChkptOn == 0) {
1512         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1513         cb.send();
1514         return;
1515         }*/
1516       if (CkInRestarting()) {
1517         // trying to checkpointing during restart
1518         cb.send();
1519         return;
1520       }
1521       // store user callback and user data
1522       CkMemCheckPT::cpCallback = cb;
1523
1524       // broadcast to start check pointing
1525       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1526       checkptMgr.doItNow(CkMyPe(), cb);
1527 #else
1528       // when mem checkpoint is disabled, invike cb immediately
1529       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1530       cb.send();
1531 #endif
1532     }
1533
1534     void CkRestartCheckPoint(int diePe)
1535     {
1536       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1537       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1538       // broadcast
1539       checkptMgr.restart(diePe);
1540     }
1541
1542     static int _diePE = -1;
1543
1544     // callback function used locally by ccs handler
1545     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1546     {
1547       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1548       CkRestartCheckPoint(_diePE);
1549     }
1550
1551
1552     // called on crashed PE
1553     static void restartBeginHandler(char *msg)
1554     {
1555       CmiFree(msg);
1556 #if CMK_MEM_CHECKPOINT
1557 #if CMK_USE_BARRIER
1558       if(CkMyPe()!=_diePE){
1559         printf("restar begin on %d\n",CkMyPe());
1560         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1561         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1562         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1563       }else{
1564         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1565         CkRestartCheckPointCallback(NULL, NULL);
1566       }
1567 #else
1568       static int count = 0;
1569       CmiAssert(CkMyPe() == _diePE);
1570       count ++;
1571       if (count == CkNumPes()) {
1572         printf("restart begin on %d\n",CkMyPe());
1573         CkRestartCheckPointCallback(NULL, NULL);
1574         count = 0;
1575       }
1576 #endif
1577 #endif
1578     }
1579
1580     extern void _discard_charm_message();
1581     extern void _resume_charm_message();
1582
1583     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1584       return data;
1585     }
1586
1587     static void restartBcastHandler(char *msg)
1588     {
1589 #if CMK_MEM_CHECKPOINT
1590       // advance phase counter
1591       CkMemCheckPT::inRestarting = 1;
1592       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1593       // gzheng
1594       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1595
1596       if (CkMyPe()==_diePE)
1597         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1598
1599       // reset QD counters
1600       /*  gzheng
1601           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1602        */
1603
1604       /*  gzheng
1605           if (CkMyPe()==_diePE)
1606           CkRestartCheckPointCallback(NULL, NULL);
1607        */
1608       CmiFree(msg);
1609
1610       _resume_charm_message();
1611
1612       // reduction
1613       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1614       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1615 #if CMK_USE_BARRIER
1616       //CmiPrintf("before reduce\n");   
1617       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1618       //CmiPrintf("after reduce\n");    
1619 #else
1620       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1621 #endif 
1622       checkpointed = 0;
1623 #endif
1624     }
1625
1626     extern void _initDone();
1627
1628     bool compare(char * buf1, char *buf2){
1629       PUP::checker pchecker(buf1,buf2);
1630       pchecker.skip();
1631       int numElements;
1632       pchecker|numElements;
1633       for(int i=0;i<numElements;i++){
1634         CkGroupID gID;
1635         CkArrayIndex idx;
1636
1637         pchecker|gID;
1638         pchecker|idx;
1639
1640         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1641         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1642       }
1643       return pchecker.getResult();
1644       //return true;
1645     }
1646     int getChecksum(char * buf){
1647       PUP::checker pchecker(buf);
1648       pchecker.skip();
1649       int numElements;
1650       pchecker|numElements;
1651 //      CkPrintf("num %d\n",numElements);
1652       for(int i=0;i<numElements;i++){
1653         CkGroupID gID;
1654         CkArrayIndex idx;
1655
1656         pchecker|gID;
1657         pchecker|idx;
1658
1659         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1660         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1661       }
1662       return pchecker.getChecksum();
1663     }
1664
1665     static void recvRemoteChkpHandler(char *msg){
1666 #if CMK_USE_CHECKSUM
1667       CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1668       CpvAccess(recvdRemote) = 1;
1669       if(CpvAccess(recvdLocal)==1){
1670         if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1671           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1672         }
1673         else{
1674           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1675         }
1676       }
1677 #else
1678       envelope *env = (envelope *)msg;
1679       CkUnpackMessage(&env);
1680       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1681       if(CpvAccess(recvdLocal)==1){
1682         int pointer = CpvAccess(curPointer);
1683         int size = CpvAccess(chkpBuf)[pointer]->len;
1684         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1685           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1686         }else
1687         {
1688           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1689         }
1690         delete chkpMsg;
1691         if(CkMyPe()==0)
1692           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1693       }else{
1694         CpvAccess(recvdRemote) = 1;
1695         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1696         CpvAccess(buddyBuf) = chkpMsg;
1697       }
1698 #endif
1699     }
1700
1701     static void replicaRecoverHandler(char *msg){
1702       CpvAccess(_remoteCrashedNode) = -1;
1703       CkMemCheckPT::replicaAlive = 1;
1704       //fflush(stdout);
1705       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1706       bool ret = true;
1707       CpvAccess(remoteChkpDone) = 1;
1708       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1709       CmiFree(msg);
1710
1711     }
1712     static void replicaChkpDoneHandler(char *msg){
1713       CpvAccess(remoteChkpDone) = 1;
1714       if(CpvAccess(localChkpDone) == 1)
1715         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1716       CmiFree(msg);
1717     }
1718
1719     static void replicaDieHandler(char * msg){
1720 #if CMK_CONVERSE_MPI    
1721       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1722       CpvAccess(_remoteCrashedNode) = diePe;
1723       CkMemCheckPT::replicaAlive = 0;
1724       if(CkMyPe()==diePe){
1725         CmiPrintf("pe %d in replicad word die\n",diePe);
1726         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1727         fflush(stdout);
1728       }
1729       find_spare_mpirank(diePe,CmiMyPartition()^1);
1730 #endif
1731       //broadcast to my partition to get local max iter
1732       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1733       CmiFree(msg);
1734     }
1735
1736
1737     static void replicaDieBcastHandler(char *msg){
1738       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1739       CpvAccess(_remoteCrashedNode) = diePe;
1740       CkMemCheckPT::replicaAlive = 0;
1741       CmiFree(msg);
1742     }
1743
1744     static void recoverRemoteProcDataHandler(char *msg){
1745       envelope *env = (envelope *)msg;
1746       CkUnpackMessage(&env);
1747       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1748
1749       //store the checkpoint
1750       int pointer = procMsg->pointer;
1751
1752
1753       if(CkMyPe()==CpvAccess(_crashedNode)){
1754         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1755         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1756         PUP::fromMem p(procMsg->packData);
1757         _handleProcData(p,CmiTrue);
1758         _initDone();
1759       }
1760       else{
1761         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1762         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1763         //_handleProcData(p,CmiFalse);
1764       }
1765       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1766       CKLOCMGR_LOOP(mgr->startInserting(););
1767
1768       CpvAccess(recvdProcChkp) =1;
1769       if(CpvAccess(recvdArrayChkp)==1){
1770         _resume_charm_message();
1771         _diePE = CpvAccess(_crashedNode);
1772         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1773         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1774         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1775       }
1776     }
1777
1778     static void recoverRemoteArrayDataHandler(char *msg){
1779       envelope *env = (envelope *)msg;
1780       CkUnpackMessage(&env);
1781       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1782
1783       //store the checkpoint
1784       int pointer = chkpMsg->pointer;
1785       CpvAccess(curPointer) = pointer;
1786       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1787       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1788       CpvAccess(recvdArrayChkp) =1;
1789       CkMemCheckPT::inRestarting = 1;
1790       if(CpvAccess(recvdProcChkp) == 1){
1791         _resume_charm_message();
1792         _diePE = CpvAccess(_crashedNode);
1793         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1794         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1795         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1796         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1797       }
1798     }
1799
1800     static void recvPhaseHandler(char * msg)
1801     {
1802       CpvAccess(_curRestartPhase)--;
1803       CkMemCheckPT::inRestarting = 1;
1804       CmiFree(msg);
1805     }
1806     // called on crashed processor
1807     static void recoverProcDataHandler(char *msg)
1808     {
1809 #if CMK_MEM_CHECKPOINT
1810       int i;
1811       envelope *env = (envelope *)msg;
1812       CkUnpackMessage(&env);
1813       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1814       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1815       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1816       PUP::fromMem p(procMsg->packData);
1817       _handleProcData(p,CmiTrue);
1818
1819       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1820       // gzheng
1821       CKLOCMGR_LOOP(mgr->startInserting(););
1822
1823       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1824       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1825       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1826       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1827
1828       _initDone();
1829       //   CpvAccess(_qd)->flushStates();
1830       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1831 #endif
1832     }
1833     //for replica, got the phase number from my neighbor processor in the current partition
1834     static void askPhaseHandler(char *msg)
1835     {
1836 #if CMK_MEM_CHECKPOINT
1837       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1838       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1839       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1840       CmiSetHandler(msg, recvPhaseHandlerIdx);
1841       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1842 #endif
1843     }
1844     // called on its backup processor
1845     // get backup message buffer and sent to crashed processor
1846     static void askProcDataHandler(char *msg)
1847     {
1848 #if CMK_MEM_CHECKPOINT
1849       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1850       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1851       if (CpvAccess(procChkptBuf) == NULL)  {
1852         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1853         CkAbort("no checkpoint found");
1854       }
1855       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1856       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1857
1858       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1859
1860       CkPackMessage(&env);
1861       CmiSetHandler(env, recoverProcDataHandlerIdx);
1862       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1863       CpvAccess(procChkptBuf) = NULL;
1864       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1865 #endif
1866     }
1867
1868     // called on PE 0
1869     void qd_callback(void *m)
1870     {
1871       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1872       fflush(stdout);
1873       CkFreeMsg(m);
1874       if(CmiNumPartition()==1){
1875 #ifdef CMK_SMP
1876         for(int i=0;i<CmiMyNodeSize();i++){
1877           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1878           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1879           CmiSetHandler(msg, askProcDataHandlerIdx);
1880           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1881           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1882         }
1883         return;
1884 #endif
1885         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1886         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1887         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1888         CmiSetHandler(msg, askProcDataHandlerIdx);
1889         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1890         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1891       }
1892       else{
1893         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1894         CmiSetHandler(msg, recvPhaseHandlerIdx);
1895         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1896       }
1897     }
1898
1899     // on crashed node
1900     void CkMemRestart(const char *dummy, CkArgMsg *args)
1901     {
1902 #if CMK_MEM_CHECKPOINT
1903       _diePE = CmiMyNode();
1904       CpvAccess( _crashedNode )= CmiMyNode();
1905       CkMemCheckPT::inRestarting = 1;
1906       _discard_charm_message();
1907
1908       /*if(CmiMyRank()==0){
1909         CkCallback cb(qd_callback);
1910         CkStartQD(cb);
1911         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1912         }*/
1913       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1914       if(CmiNumPartition()==1){
1915         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1916         restartT = CmiWallTimer();
1917         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1918         fflush(stdout);
1919         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1920         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1921         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1922         CmiSetHandler(msg, askProcDataHandlerIdx);
1923         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1924         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1925       }
1926       else{
1927         CkCallback cb(qd_callback);
1928         CkStartQD(cb);
1929       }
1930 #else
1931       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1932 #endif
1933     }
1934
1935     // can be called in other files
1936     // return true if it is in restarting
1937     extern "C"
1938       int CkInRestarting()
1939       {
1940 #if CMK_MEM_CHECKPOINT
1941         if (CpvAccess( _crashedNode)!=-1) return 1;
1942         // gzheng
1943         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1944         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1945         return CkMemCheckPT::inRestarting;
1946 #else
1947         return 0;
1948 #endif
1949       }
1950
1951     extern "C"
1952       int CkReplicaAlive()
1953       {
1954         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1955         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1956         return CkMemCheckPT::replicaAlive;
1957
1958         /*if(CkMemCheckPT::replicaDead==1)
1959           return 0;
1960           else          
1961           return 1;*/
1962       }
1963
1964     extern "C"
1965       int CkInCheckpointing()
1966       {
1967         return CkMemCheckPT::inCheckpointing;
1968       }
1969
1970     extern "C"
1971       void CkSetInLdb(){
1972 #if CMK_MEM_CHECKPOINT
1973         CkMemCheckPT::inLoadbalancing = 1;
1974 #endif
1975       }
1976
1977     extern "C"
1978       int CkInLdb(){
1979 #if CMK_MEM_CHECKPOINT
1980         return CkMemCheckPT::inLoadbalancing;
1981 #endif
1982         return 0;
1983       }
1984
1985     extern "C"
1986       void CkResetInLdb(){
1987 #if CMK_MEM_CHECKPOINT
1988         CkMemCheckPT::inLoadbalancing = 0;
1989 #endif
1990       }
1991
1992     /*****************************************************************************
1993       module initialization
1994      *****************************************************************************/
1995
1996     static int arg_where = CkCheckPoint_inMEM;
1997
1998 #if CMK_MEM_CHECKPOINT
1999     void init_memcheckpt(char **argv)
2000     {
2001       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2002         arg_where = CkCheckPoint_inDISK;
2003       }
2004       // initiliazing _crashedNode variable
2005       CpvInitialize(int, _crashedNode);
2006       CpvInitialize(int, _remoteCrashedNode);
2007       CpvAccess(_crashedNode) = -1;
2008       CpvAccess(_remoteCrashedNode) = -1;
2009       init_FI(argv);
2010     }
2011 #endif
2012
2013     class CkMemCheckPTInit: public Chare {
2014       public:
2015         CkMemCheckPTInit(CkArgMsg *m) {
2016 #if CMK_MEM_CHECKPOINT
2017           if (arg_where == CkCheckPoint_inDISK) {
2018             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2019           }
2020           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2021           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2022 #endif
2023         }
2024     };
2025
2026     static void notifyHandler(char *msg)
2027     {
2028 #if CMK_MEM_CHECKPOINT
2029       CmiFree(msg);
2030       /* immediately increase restart phase to filter old messages */
2031       CpvAccess(_curRestartPhase) ++;
2032       CpvAccess(_qd)->flushStates();
2033       _discard_charm_message();
2034
2035 #endif
2036     }
2037
2038     extern "C"
2039       void notify_crash(int node)
2040       {
2041 #ifdef CMK_MEM_CHECKPOINT
2042         CpvAccess( _crashedNode) = node;
2043 #ifdef CMK_SMP
2044         for(int i=0;i<CkMyNodeSize();i++){
2045           CpvAccessOther(_crashedNode,i)=node;
2046         }
2047 #endif
2048         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2049         CkMemCheckPT::inRestarting = 1;
2050
2051         // this may be in interrupt handler, send a message to reset QD
2052         int pe = CmiNodeFirst(CkMyNode());
2053         for(int i=0;i<CkMyNodeSize();i++){
2054           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2055           CmiSetHandler(msg, notifyHandlerIdx);
2056           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2057         }
2058 #endif
2059       }
2060
2061     extern "C" void (*notify_crash_fn)(int node);
2062
2063
2064 #if CMK_CONVERSE_MPI
2065     void buddyDieHandler(char *msg)
2066     {
2067 #if CMK_MEM_CHECKPOINT
2068       // notify
2069       CkMemCheckPT::inRestarting = 1;
2070       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2071       notify_crash(diepe);
2072       // send message to crash pe to let it restart
2073       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2074       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2075       int buddy = obj->BuddyPE(CmiMyPe());
2076       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2077       if (buddy == diepe)  {
2078         mpi_restart_crashed(diepe, newrank);
2079       }
2080 #endif
2081     }
2082
2083     void pingHandler(void *msg)
2084     {
2085       lastPingTime = CmiWallTimer();
2086       CmiFree(msg);
2087     }
2088
2089     void pingCheckHandler()
2090     {
2091 #if CMK_MEM_CHECKPOINT
2092       double now = CmiWallTimer();
2093       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2094         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2095         int i, pe, buddy;
2096         // tell everyone the buddy dies
2097         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2098         for (i = 1; i < CmiNumPes(); i++) {
2099           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2100           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2101         }
2102         buddy = pe;
2103         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2104         fflush(stdout);
2105         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2106         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2107         CmiSetHandler(msg, buddyDieHandlerIdx);
2108         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2109         //send to everyone in the other world
2110         if(CmiNumPartition()!=1){
2111           for(int i=0;i<CmiNumPes();i++){
2112             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2113             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2114             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2115             //fflush(stdout);
2116             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2117             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2118           }
2119         }
2120       }
2121         else 
2122           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2123 #endif
2124       }
2125
2126       void pingBuddy()
2127       {
2128 #if CMK_MEM_CHECKPOINT
2129         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2130         if (obj) {
2131           int buddy = obj->BuddyPE(CkMyPe());
2132           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2133           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2134           CmiSetHandler(msg, pingHandlerIdx);
2135           CmiGetRestartPhase(msg) = 9999;
2136           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2137         }
2138         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2139 #endif
2140       }
2141 #endif
2142
2143       // initproc
2144       void CkRegisterRestartHandler( )
2145       {
2146 #if CMK_MEM_CHECKPOINT
2147         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2148         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2149         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2150         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2151         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2152
2153         //for replica
2154         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2155         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2156         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2157         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2158         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2159         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2160         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2161         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2162         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2163
2164 #if CMK_CONVERSE_MPI
2165         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2166         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2167         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2168 #endif
2169
2170         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2171         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2172         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2173         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2174         CpvInitialize(int,curPointer);
2175         CpvInitialize(int,recvdLocal);
2176         CpvInitialize(int,localChkpDone);
2177         CpvInitialize(int,remoteChkpDone);
2178         CpvInitialize(int,recvdRemote);
2179         CpvInitialize(int,recvdProcChkp);
2180         CpvInitialize(int,localChecksum);
2181         CpvInitialize(int,remoteChecksum);
2182         CpvInitialize(int,recvdArrayChkp);
2183
2184         CpvAccess(procChkptBuf) = NULL;
2185         CpvAccess(buddyBuf) = NULL;
2186         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2187         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2188         CpvAccess(chkpBuf)[0] = NULL;
2189         CpvAccess(chkpBuf)[1] = NULL;
2190         CpvAccess(localProcChkpBuf)[0] = NULL;
2191         CpvAccess(localProcChkpBuf)[1] = NULL;
2192
2193         CpvAccess(curPointer) = 0;
2194         CpvAccess(recvdLocal) = 0;
2195         CpvAccess(localChkpDone) = 0;
2196         CpvAccess(remoteChkpDone) = 0;
2197         CpvAccess(recvdRemote) = 0;
2198         CpvAccess(recvdProcChkp) = 0;
2199         CpvAccess(localChecksum) = 0;
2200         CpvAccess(remoteChecksum) = 0;
2201         CpvAccess(recvdArrayChkp) = 0;
2202
2203         notify_crash_fn = notify_crash;
2204
2205 #if ! CMK_CONVERSE_MPI
2206         // print pid to kill
2207         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2208         //  sleep(4);
2209 #endif
2210 #endif
2211       }
2212
2213
2214       extern "C"
2215         int CkHasCheckpoints()
2216         {
2217           return checkpointed;
2218         }
2219
2220       /// @todo: the following definitions should be moved to a separate file containing
2221       // structures and functions about fault tolerance strategies
2222
2223       /**
2224        *  * @brief: function for killing a process                                             
2225        *   */
2226 #ifdef CMK_MEM_CHECKPOINT
2227 #if CMK_HAS_GETPID
2228       void killLocal(void *_dummy,double curWallTime){
2229         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2230         if(CmiWallTimer()<killTime-1){
2231           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2232         }else{ 
2233 #if CMK_CONVERSE_MPI
2234           CkDieNow();
2235 #else 
2236           kill(getpid(),SIGKILL);                                               
2237 #endif
2238         }              
2239       } 
2240 #else
2241       void killLocal(void *_dummy,double curWallTime){
2242         CmiAbort("kill() not supported!");
2243       }
2244 #endif
2245 #endif
2246
2247 #ifdef CMK_MEM_CHECKPOINT
2248       /**
2249        * @brief: reads the file with the kill information
2250        */
2251       void readKillFile(){
2252         FILE *fp=fopen(killFile,"r");
2253         if(!fp){
2254           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2255           return;
2256         }
2257         int proc;
2258         double sec;
2259         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2260           if(proc == CkMyNode() && CkMyRank() == 0){
2261             killTime = CmiWallTimer()+sec;
2262             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2263             CcdCallFnAfter(killLocal,NULL,sec*1000);
2264           }
2265         }
2266         fclose(fp);
2267       }
2268
2269 #if ! CMK_CONVERSE_MPI
2270       void CkDieNow()
2271       {
2272 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2273         // ignored for non-mpi version
2274         CmiPrintf("[%d] die now.\n", CmiMyPe());
2275         killTime = CmiWallTimer()+0.001;
2276         CcdCallFnAfter(killLocal,NULL,1);
2277 #endif
2278       }
2279 #endif
2280
2281 #endif
2282
2283 #include "CkMemCheckpoint.def.h"
2284
2285