add strong resilience
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
440   int elemCount = CkCountChkpSyncElements();
441   if(CkMyPe()==0)
442     startTime = CmiWallTimer();
443   if(elemCount == 0){
444     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
445   }
446 }
447
448 //when one replica failes, decide the next chkp iter
449
450 void CkMemCheckPT::recvIter(int iter){
451   if(maxIter<iter){
452     maxIter=iter;
453   }
454 }
455
456 void CkMemCheckPT::recvMaxIter(int iter){
457   localDecided = false;
458   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
459 }
460
461 void CkMemCheckPT::reachChkpIter(){
462   recvIterCount++;
463   elemCount = CkCountChkpSyncElements();
464   if(recvIterCount == elemCount){
465     recvIterCount = 0;
466     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
467   }
468 }
469
470 void CkMemCheckPT::startChkp(){
471   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
472   CkStartMemCheckpoint(cpCallback);
473 }
474
475 // called by checkpoint mgr to restore an array element
476 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
477 {
478 #if CMK_MEM_CHECKPOINT
479   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
480   // m->index.print();
481   PUP::fromMem p(m->packData);
482   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
483   CmiAssert(mgr);
484 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
485   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
486 #else
487   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
488 #endif
489
490   // find a list of array elements bound together
491   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
492   CmiAssert(elt);
493   CkLocRec_local *rec = elt->myRec;
494   CkVec<CkMigratable *> list;
495   mgr->migratableList(rec, list);
496   CmiAssert(list.length() > 0);
497   for (int l=0; l<list.length(); l++) {
498     elt = (ArrayElement *)list[l];
499     elt->budPEs[0] = m->bud1;
500     elt->budPEs[1] = m->bud2;
501     //    reset, may not needed now
502     // for now.
503     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
504       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
505       if (c) c->redNo = 0;
506     }
507   }
508 #endif
509 }
510
511 // return 1 if pe is a crashed processor
512 int CkMemCheckPT::isFailed(int pe)
513 {
514   for (int i=0; i<failedPes.length(); i++)
515     if (failedPes[i] == pe) return 1;
516   return 0;
517 }
518
519 // add pe into history list of all failed processors
520 void CkMemCheckPT::failed(int pe)
521 {
522   if (isFailed(pe)) return;
523   failedPes.push_back(pe);
524 }
525
526 int CkMemCheckPT::totalFailed()
527 {
528   return failedPes.length();
529 }
530
531 // create an checkpoint entry for array element of aid with index.
532 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
533 {
534   // error check, no duplicate
535   int idx, len = ckTable.size();
536   for (idx=0; idx<len; idx++) {
537     CkCheckPTInfo *entry = ckTable[idx];
538     if (index == entry->index) {
539       if (loc == entry->locMgr) {
540         // bindTo array elements
541         return;
542       }
543       // for array inheritance, the following check may fail
544       // because ArrayElement constructor of all superclasses are called
545       if (aid == entry->aid) {
546         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
547         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
548       }
549     }
550   }
551   CkCheckPTInfo *newEntry;
552   if (where == CkCheckPoint_inMEM)
553     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
554   else
555     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
556   ckTable.push_back(newEntry);
557   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
558 }
559
560 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
561 {
562 #if !CMK_CHKP_ALL       
563   int buddy = msg->bud1;
564   if (buddy == CkMyPe()) buddy = msg->bud2;
565   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
566   recvData(msg);
567   // ack
568   thisProxy[buddy].gotData();
569 #else
570   chkpTable[0] = NULL;
571   chkpTable[1] = NULL;
572   recvArrayCheckpoint(msg);
573   thisProxy[msg->bud2].gotData();
574 #endif
575 }
576
577 // loop through my checkpoint table and ask checkpointed array elements
578 // to send me checkpoint data.
579 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
580 void CkMemCheckPT::doItNow(int starter)
581 {
582   checkpointed = 1;
583   //cpCallback = cb;
584   cpStarter = starter;
585   inCheckpointing = 1;
586   if (CkMyPe() == cpStarter) {
587     startTime = CmiWallTimer();
588     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
589   }
590 #if CMK_CONVERSE_MPI
591   if(CmiNumPartition()==1)
592 #endif    
593   {
594
595 #if !CMK_CHKP_ALL
596     int len = ckTable.length();
597     for (int i=0; i<len; i++) {
598       CkCheckPTInfo *entry = ckTable[i];
599       // always let the bigger number processor send request
600       //if (CkMyPe() < entry->pNo) continue;
601       // always let the smaller number processor send request, may on same proc
602       if (!isMaster(entry->pNo)) continue;
603       // call inmem_checkpoint to the array element, ask it to send
604       // back checkpoint data via recvData().
605       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
606       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
607     }
608     // if my table is empty, then I am done
609     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
610 #else
611     startArrayCheckpoint();
612 #endif
613     sendProcData();
614   }
615 #if CMK_CONVERSE_MPI
616   else
617   {
618     startCheckpoint();
619   }
620 #endif
621   // pack and send proc level data
622 }
623
624 class MemElementPacker : public CkLocIterator{
625   private:
626     //    CkLocMgr *locMgr;
627     PUP::er &p;
628     std::map<CkHashCode,CkLocation> arrayMap;
629   public:
630     MemElementPacker(PUP::er &p_):p(p_){};
631     void addLocation(CkLocation &loc){
632       CkArrayIndexMax idx = loc.getIndex();
633       arrayMap[idx.hash()] = loc;
634     }
635     void writeCheckpoint(){
636       std::map<CkHashCode, CkLocation>::iterator it;
637       for(it = arrayMap.begin();it!=arrayMap.end();it++){
638         CkLocation loc = it->second;
639         CkLocMgr *locMgr = loc.getManager();
640         CkArrayIndexMax idx = loc.getIndex();
641         CkGroupID gID = locMgr->ckGetGroupID();
642         p|gID;
643         p|idx;
644         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
645       }
646     }
647 };
648
649 void pupAllElements(PUP::er &p){
650 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
651   int numElements;
652   if(!p.isUnpacking()){
653     numElements = CkCountArrayElements();
654   }
655   p | numElements;
656   if(!p.isUnpacking()){
657     MemElementPacker packer(p);
658     CKLOCMGR_LOOP(mgr->iterate(packer););
659     packer.writeCheckpoint();
660   }
661 #endif
662 }
663
664 void CkMemCheckPT::startArrayCheckpoint(){
665 #if CMK_CHKP_ALL
666   int size;
667   {
668     PUP::sizer psizer;
669     pupAllElements(psizer);
670     size = psizer.size();
671   }
672   int packSize = size/sizeof(double)+1;
673   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
674   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
675   msg->len = size;
676   msg->cp_flag = 1;
677   int budPEs[2];
678   msg->bud1=CkMyPe();
679   msg->bud2=ChkptOnPe(CkMyPe());
680   {
681     PUP::toMem p(msg->packData);
682     pupAllElements(p);
683   }
684   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
685   if(chkpTable[0]) delete chkpTable[0];
686   chkpTable[0] = msg;
687   //send the checkpoint to my 
688   recvCount++;
689 #endif
690 }
691
692 void CkMemCheckPT::startCheckpoint(){
693 #if CMK_CONVERSE_MPI
694   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
695     CkPrintf("in start checkpointing!!!!\n");
696   int size;
697   {
698     PUP::sizer p;
699     _handleProcData(p,CmiFalse);
700     size = p.size();
701   }
702   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
703   procMsg->len = size;
704   procMsg->cp_flag = 1;
705   {
706     PUP::toMem p(procMsg->packData);
707     _handleProcData(p,CmiFalse);
708   }
709   int pointer = CpvAccess(curPointer);
710   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
711   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
712
713   {
714     PUP::sizer psizer;
715     pupAllElements(psizer);
716     size = psizer.size();
717   }
718   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
719   msg->len = size;
720   msg->cp_flag = 1;
721   int checksum;
722   {
723     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
724       PUP::checker p(msg->packData);
725       pupAllElements(p);
726       checksum = p.getChecksum();
727     }else{  
728 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
729       PUP::toMem p(msg->packData);
730       pupAllElements(p);
731     }
732   }
733   pointer = CpvAccess(curPointer);
734   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
735   CpvAccess(chkpBuf)[pointer] = msg;
736   if(CkMyPe()==0)
737     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
738   if(CkReplicaAlive()==1){
739     CpvAccess(recvdLocal) = 1;
740     if(CpvAccess(use_checksum)){
741       CpvAccess(localChecksum) = checksum;
742       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
743       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
744       //only one reaplica will send
745       if(CmiMyPartition()==0){
746         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
747         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
748       }
749     }else{
750       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
751       CkPackMessage(&env);
752       if(CmiMyPartition()==0){
753         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
754         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
755       }
756     }
757     if(CmiMyPartition()==0){
758       notifyReplica = 1;
759       thisProxy[CkMyPe()].doneComparison(true);
760     }
761   }
762   if(CpvAccess(recvdRemote)==1){
763     //only partition 1 will do it
764     //compare the checkpoint 
765     int size = CpvAccess(chkpBuf)[pointer]->len;
766     if(CpvAccess(use_checksum)){
767       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
768         thisProxy[CkMyPe()].doneComparison(true);
769       }
770       else{
771         thisProxy[CkMyPe()].doneComparison(false);
772       }
773     }else{
774       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
775         thisProxy[CkMyPe()].doneComparison(true);
776       }
777       else{
778         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
779         thisProxy[CkMyPe()].doneComparison(false);
780       }
781     }
782     if(CkMyPe()==0)
783       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
784   }
785   else{
786     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
787       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
788       if(CpvAccess(remoteReady)==1){
789         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
790         {       
791           int pointer = CpvAccess(curPointer);
792           //send the proc data
793           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
794           procMsg->pointer = pointer;
795           envelope * env = (envelope *)(UsrToEnv(procMsg));
796           CkPackMessage(&env);
797           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
798           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
799           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
800             CkPrintf("[%d] sendProcdata\n",CkMyPe());
801           }
802         }
803         //send the array checkpoint data
804         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
805         msg->pointer = CpvAccess(curPointer);
806         envelope * env = (envelope *)(UsrToEnv(msg));
807         CkPackMessage(&env);
808         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
809         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
810         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
811           CkPrintf("[%d] sendArraydata\n",CkMyPe());
812       }else{
813         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
814           int pointer = CpvAccess(curPointer);
815           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
816           (CpvAccess(recoverProcBuf))->pointer = pointer;
817         }
818         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
819         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
820         CpvAccess(localReady) = 1;
821       }
822       //can continue work, no need to wait for my replica
823       int _ret = 1;
824       notifyReplica = 1;
825       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
826       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
827     }
828   }
829 #endif
830 }
831
832 void CkMemCheckPT::doneComparison(bool ret){
833   int _ret = 1;
834   if(!ret){
835     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
836     _ret = 0;
837   }else{
838     _ret = 1;
839   }
840   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
841   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
842 }
843
844 void CkMemCheckPT::doneRComparison(int ret){
845   //    if(CpvAccess(curPointer) == 0){
846   //if(ret==CkNumPes()){
847     CpvAccess(localChkpDone) = 1;
848     if(CpvAccess(remoteChkpDone) ==1){
849       thisProxy.doneBothComparison();
850     }else{
851       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
852     }
853     if(notifyReplica == 0){
854       //notify the replica am done
855       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
856       *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
857       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
858       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
859       notifyReplica = 1;
860     }
861  // }
862  /* else{
863     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
864     startTime = CmiWallTimer();
865     thisProxy.RollBack();
866   }*/
867 }
868
869 void CkMemCheckPT::doneBothComparison(){
870   CpvAccess(recvdRemote) = 0;
871   CpvAccess(remoteReady) = 0;
872   CpvAccess(localReady) = 0;
873   CpvAccess(recvdLocal) = 0;
874   CpvAccess(localChkpDone) = 0;
875   CpvAccess(remoteChkpDone) = 0;
876   CpvAccess(remoteStarted) = 0;
877   CpvAccess(localStarted) = 0;
878   CpvAccess(_remoteCrashedNode) = -1;
879   CkMemCheckPT::replicaAlive = 1;
880   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
881   CpvAccess(curPointer)^=1;
882   inCheckpointing = 0;
883   notifyReplica = 0;
884   if(CkMyPe() == 0){
885     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
886   }
887   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
888 }
889
890 void CkMemCheckPT::RollBack(){
891   //restore group data
892   checkpointed = 0;
893   CkMemCheckPT::inRestarting = 1;
894   int pointer = CpvAccess(curPointer)^1;//use the previous one
895   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
896   PUP::fromMem p(chkpMsg->packData);    
897
898   //destroy array elements
899   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
900   int numGroups = CkpvAccess(_groupIDTable)->size();
901   for(int i=0;i<numGroups;i++) {
902     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
903     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
904     obj->flushStates();
905     obj->ckJustMigrated();
906   }
907   //restore array elements
908
909   int numElements;
910   p|numElements;
911
912   if(p.isUnpacking()){
913     for(int i=0;i<numElements;i++){
914       CkGroupID gID;
915       CkArrayIndex idx;
916       p|gID;
917       p|idx;
918       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
919       CmiAssert(mgr);
920       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
921     }
922     }
923     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
924     contribute(cb);
925   }
926
927   void CkMemCheckPT::notifyReplicaDie(int pe){
928     replicaAlive = 0;
929     CpvAccess(_remoteCrashedNode) = pe;
930   }
931
932   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
933   {
934 #if CMK_CHKP_ALL
935     int idx = 1;
936     if(msg->bud1 == CkMyPe()){
937       idx = 0;
938     }
939     int isChkpting = msg->cp_flag;
940     if(isChkpting == 1){
941       if(chkpTable[idx]) delete chkpTable[idx];
942     }
943     chkpTable[idx] = msg;
944     if(isChkpting){
945       recvCount++;
946       if(recvCount == 2){
947         if (where == CkCheckPoint_inMEM) {
948           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
949         }
950         else if (where == CkCheckPoint_inDISK) {
951           // another barrier for finalize the writing using fsync
952           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
953           contribute(0,NULL,CkReduction::sum_int,localcb);
954         }
955         else
956           CmiAbort("Unknown checkpoint scheme");
957         recvCount = 0;
958       }
959     }
960 #endif
961   }
962
963   // don't handle array elements
964   static inline void _handleProcData(PUP::er &p, CmiBool create)
965   {
966     // save readonlys, and callback BTW
967     CkPupROData(p);
968
969     // save mainchares 
970     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
971
972 #ifndef CMK_CHARE_USE_PTR
973     // save non-migratable chare
974     CkPupChareData(p);
975 #endif
976
977     // save groups into Groups.dat
978     CkPupGroupData(p,create);
979
980     // save nodegroups into NodeGroups.dat
981     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
982   }
983
984   void CkMemCheckPT::sendProcData()
985   {
986     // find out size of buffer
987     int size;
988     {
989       PUP::sizer p;
990       _handleProcData(p,CmiTrue);
991       size = p.size();
992     }
993     int packSize = size;
994     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
995     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
996     {
997       PUP::toMem p(msg->packData);
998       _handleProcData(p,CmiTrue);
999     }
1000     msg->pe = CkMyPe();
1001     msg->len = size;
1002     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1003     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1004   }
1005
1006   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1007   {
1008     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1009     CpvAccess(procChkptBuf) = msg;
1010     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1011     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1012   }
1013
1014   // ArrayElement call this function to give us the checkpointed data
1015   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1016   {
1017     int len = ckTable.length();
1018     int idx;
1019     for (idx=0; idx<len; idx++) {
1020       CkCheckPTInfo *entry = ckTable[idx];
1021       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1022     }
1023     CkAssert(idx < len);
1024     int isChkpting = msg->cp_flag;
1025     ckTable[idx]->updateBuffer(msg);
1026     if (isChkpting) {
1027       // all my array elements have returned their inmem data
1028       // inform starter processor that I am done.
1029       recvCount ++;
1030       if (recvCount == ckTable.length()) {
1031         if (where == CkCheckPoint_inMEM) {
1032           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1033         }
1034         else if (where == CkCheckPoint_inDISK) {
1035           // another barrier for finalize the writing using fsync
1036           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1037           contribute(0,NULL,CkReduction::sum_int,localcb);
1038         }
1039         else
1040           CmiAbort("Unknown checkpoint scheme");
1041         recvCount = 0;
1042       } 
1043     }
1044   }
1045
1046   // only used in disk checkpointing
1047   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1048   {
1049     delete m;
1050 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1051     system("sync");
1052 #endif
1053     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1054   }
1055
1056   // only is called on cpStarter when checkpoint is done
1057   void CkMemCheckPT::cpFinish()
1058   {
1059     CmiAssert(CkMyPe() == cpStarter);
1060     peCount++;
1061     // now that all processors have finished, activate callback
1062     if (peCount == 2) 
1063     {
1064       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1065       peCount = 0;
1066       thisProxy.report();
1067     }
1068   }
1069
1070   // for debugging, report checkpoint info
1071   void CkMemCheckPT::report()
1072   {
1073     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1074     inCheckpointing = 0;
1075 #if !CMK_CHKP_ALL
1076     int objsize = 0;
1077     int len = ckTable.length();
1078     for (int i=0; i<len; i++) {
1079       CkCheckPTInfo *entry = ckTable[i];
1080       CmiAssert(entry);
1081       objsize += entry->getSize();
1082     }
1083     CmiAssert(CpvAccess(procChkptBuf));
1084     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1085 #else
1086     if(CkMyPe()==0)
1087       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1088 #endif
1089   }
1090
1091   /*****************************************************************************
1092     RESTART Procedure
1093    *****************************************************************************/
1094
1095   // master processor of two buddies
1096   inline int CkMemCheckPT::isMaster(int buddype)
1097   {
1098 #if 0
1099     int mype = CkMyPe();
1100     //CkPrintf("ismaster: %d %d\n", pe, mype);
1101     if (CkNumPes() - totalFailed() == 2) {
1102       return mype > buddype;
1103     }
1104     for (int i=1; i<CkNumPes(); i++) {
1105       int me = (buddype+i)%CkNumPes();
1106       if (isFailed(me)) continue;
1107       if (me == mype) return 1;
1108       else return 0;
1109     }
1110     return 0;
1111 #else
1112     // smaller one
1113     int mype = CkMyPe();
1114     //CkPrintf("ismaster: %d %d\n", pe, mype);
1115     if (CkNumPes() - totalFailed() == 2) {
1116       return mype < buddype;
1117     }
1118 #if NODE_CHECKPOINT
1119     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1120     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1121 #else
1122       for (int i=1; i<CkNumPes(); i++) {
1123 #endif
1124         int me = (mype+i)%CkNumPes();
1125         if (isFailed(me)) continue;
1126         if (me == buddype) return 1;
1127         else return 0;
1128       }
1129       return 0;
1130 #endif
1131     }
1132
1133
1134
1135 #if 0
1136     // helper class to pup all elements that belong to same ckLocMgr
1137     class ElementDestoryer : public CkLocIterator {
1138       private:
1139         CkLocMgr *locMgr;
1140       public:
1141         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1142         void addLocation(CkLocation &loc) {
1143           CkArrayIndex idx=loc.getIndex();
1144           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1145           loc.destroy();
1146         }
1147     };
1148 #endif
1149
1150     // restore the bitmap vector for LB
1151     void CkMemCheckPT::resetLB(int diepe)
1152     {
1153 #if CMK_LBDB_ON
1154       int i;
1155       char *bitmap = new char[CkNumPes()];
1156       // set processor available bitmap
1157       get_avail_vector(bitmap);
1158       for (i=0; i<failedPes.length(); i++)
1159         bitmap[failedPes[i]] = 0; 
1160       bitmap[diepe] = 0;
1161
1162 #if CK_NO_PROC_POOL
1163       set_avail_vector(bitmap);
1164 #endif
1165
1166       // if I am the crashed pe, rebuild my failedPEs array
1167       if (CkMyNode() == diepe)
1168         for (i=0; i<CkNumPes(); i++) 
1169           if (bitmap[i]==0) failed(i);
1170
1171       delete [] bitmap;
1172 #endif
1173     }
1174
1175     static double restartT;
1176
1177     // in case when failedPe dies, everybody go through its checkpoint table:
1178     // destory all array elements
1179     // recover lost buddies
1180     // reconstruct all array elements from check point data
1181     // called on all processors
1182     void CkMemCheckPT::restart(int diePe)
1183     {
1184 #if CMK_MEM_CHECKPOINT
1185       double curTime = CmiWallTimer();
1186       if (CkMyPe() == diePe){
1187         restartT = CmiWallTimer();
1188         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1189       }
1190       stage = (char*)"resetLB";
1191       startTime = curTime;
1192       if (CkMyPe() == diePe)
1193         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1194
1195 #if CK_NO_PROC_POOL
1196       failed(diePe);    // add into the list of failed pes
1197 #endif
1198       thisFailedPe = diePe;
1199
1200       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1201
1202       inRestarting = 1;
1203
1204       // disable load balancer's barrier
1205       //if (CkMyPe() != diePe) resetLB(diePe);
1206
1207       CKLOCMGR_LOOP(mgr->startInserting(););
1208
1209
1210       if(CmiNumPartition()==1){
1211         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1212       }else{
1213         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1214         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1215       }
1216 #endif
1217     }
1218
1219     // loally remove all array elements
1220     void CkMemCheckPT::removeArrayElements()
1221     {
1222 #if CMK_MEM_CHECKPOINT
1223       int len = ckTable.length();
1224       double curTime = CmiWallTimer();
1225       if (CkMyPe() == thisFailedPe) 
1226         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1227       stage = (char*)"removeArrayElements";
1228       startTime = curTime;
1229
1230       //  if (cpCallback.isInvalid()) 
1231       //          CkPrintf("invalid pe %d\n",CkMyPe());
1232       //          CkAbort("Didn't set restart callback\n");;
1233       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1234
1235       // get rid of all buffering and remote recs
1236       // including destorying all array elements
1237 #if CK_NO_PROC_POOL  
1238       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1239 #else
1240       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1241 #endif
1242       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1243 #endif
1244     }
1245
1246     // flush state in reduction manager
1247     void CkMemCheckPT::resetReductionMgr()
1248     {
1249       if (CkMyPe() == thisFailedPe) 
1250         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1251       int numGroups = CkpvAccess(_groupIDTable)->size();
1252       for(int i=0;i<numGroups;i++) {
1253         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1254         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1255         obj->flushStates();
1256         obj->ckJustMigrated();
1257       }
1258       // reset again
1259       //CpvAccess(_qd)->flushStates();
1260       if(CmiNumPartition()==1){
1261         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1262       }
1263       else
1264         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1265     }
1266
1267     // recover the lost buddies
1268     void CkMemCheckPT::recoverBuddies()
1269     {
1270       int idx;
1271       int len = ckTable.length();
1272       // ready to flush reduction manager
1273       // cannot be CkMemCheckPT::restart because destory will modify states
1274       double curTime = CmiWallTimer();
1275       if (CkMyPe() == thisFailedPe)
1276         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1277       stage = (char *)"recoverBuddies";
1278       if (CkMyPe() == thisFailedPe)
1279         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1280       startTime = curTime;
1281
1282       // recover buddies
1283       expectCount = 0;
1284 #if !CMK_CHKP_ALL
1285       for (idx=0; idx<len; idx++) {
1286         CkCheckPTInfo *entry = ckTable[idx];
1287         if (entry->pNo == thisFailedPe) {
1288 #if CK_NO_PROC_POOL
1289           // find a new buddy
1290           int budPe = BuddyPE(CkMyPe());
1291 #else
1292           int budPe = thisFailedPe;
1293 #endif
1294           CkArrayCheckPTMessage *msg = entry->getCopy();
1295           msg->bud1 = budPe;
1296           msg->bud2 = CkMyPe();
1297           msg->cp_flag = 0;            // not checkpointing
1298           thisProxy[budPe].recoverEntry(msg);
1299           expectCount ++;
1300         }
1301       }
1302 #else
1303       //send to failed pe
1304       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1305 #if CK_NO_PROC_POOL
1306         // find a new buddy
1307         int budPe = BuddyPE(CkMyPe());
1308 #else
1309         int budPe = thisFailedPe;
1310 #endif
1311         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1312         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1313         msg->cp_flag = 0;            // not checkpointing
1314         msg->bud1 = budPe;
1315         msg->bud2 = CkMyPe();
1316         thisProxy[budPe].recoverEntry(msg);
1317         expectCount ++;
1318       }
1319 #endif
1320
1321       if (expectCount == 0) {
1322         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1323       }
1324       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1325     }
1326
1327     void CkMemCheckPT::gotData()
1328     {
1329       ackCount ++;
1330       if (ackCount == expectCount) {
1331         ackCount = 0;
1332         expectCount = -1;
1333         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1334         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1335       }
1336     }
1337
1338     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1339     {
1340
1341       for (int i=0; i<n; i++) {
1342         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1343         mgr->updateLocation(idx[i], nowOnPe);
1344       }
1345       thisProxy[nowOnPe].gotReply();
1346     }
1347
1348     // restore array elements
1349     void CkMemCheckPT::recoverArrayElements()
1350     {
1351       double curTime = CmiWallTimer();
1352       int len = ckTable.length();
1353       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1354       stage = (char *)"recoverArrayElements";
1355       if (CkMyPe() == thisFailedPe)
1356         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1357       startTime = curTime;
1358       int flag = 0;
1359       // recover all array elements
1360       int count = 0;
1361
1362 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1363       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1364       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1365 #endif
1366
1367 #if !CMK_CHKP_ALL
1368       for (int idx=0; idx<len; idx++)
1369       {
1370         CkCheckPTInfo *entry = ckTable[idx];
1371 #if CK_NO_PROC_POOL
1372         // the bigger one will do 
1373         //    if (CkMyPe() < entry->pNo) continue;
1374         if (!isMaster(entry->pNo)) continue;
1375 #else
1376         // smaller one do it, which has the original object
1377         if (CkMyPe() == entry->pNo+1 || 
1378             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1379 #endif
1380         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1381
1382         entry->updateBuddy(CkMyPe(), entry->pNo);
1383         CkArrayCheckPTMessage *msg = entry->getCopy();
1384         // gzheng
1385         //thisProxy[CkMyPe()].inmem_restore(msg);
1386         inmem_restore(msg);
1387 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1388         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1389         int homePe = mgr->homePe(msg->index);
1390         if (homePe != CkMyPe()) {
1391           gmap[homePe].push_back(msg->locMgr);
1392           imap[homePe].push_back(msg->index);
1393         }
1394 #endif
1395         CkFreeMsg(msg);
1396         count ++;
1397       }
1398 #else
1399       char * packData;
1400       if(CmiNumPartition()==1){
1401         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1402         packData = (char *)msg->packData;
1403       }
1404       else{
1405         int pointer = CpvAccess(curPointer);
1406         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1407         packData = msg->packData;
1408       }
1409 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1410       recoverAll(packData,gmap,imap);
1411 #else
1412       recoverAll(packData);
1413 #endif
1414 #endif
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       for (int i=0; i<CkNumPes(); i++) {
1417         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1418           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1419           flag++;       
1420         }
1421       }
1422       delete [] imap;
1423       delete [] gmap;
1424 #endif
1425       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1426
1427       CKLOCMGR_LOOP(mgr->doneInserting(););
1428
1429       // _crashedNode = -1;
1430       CpvAccess(_crashedNode) = -1;
1431 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1432       if (CkMyPe() == 0)
1433         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1434 #else
1435       if(flag == 0)
1436       {
1437         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1438       }
1439 #endif
1440     }
1441
1442     void CkMemCheckPT::gotReply(){
1443       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1444     }
1445
1446     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1447 #if CMK_CHKP_ALL
1448       PUP::fromMem p(packData);
1449       int numElements;
1450       p|numElements;
1451       if(p.isUnpacking()){
1452         for(int i=0;i<numElements;i++){
1453           CkGroupID gID;
1454           CkArrayIndex idx;
1455           p|gID;
1456           p|idx;
1457           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1458           int homePe = mgr->homePe(idx);
1459 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1460           mgr->resume(idx,p,CmiTrue,CmiTrue);
1461 #else
1462           if(CmiNumPartition()==1)
1463             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1464           else{
1465             if(CkMyPe()==thisFailedPe){
1466               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1467             }
1468             else{
1469               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1470             }
1471           }
1472 #endif
1473 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1474           homePe = mgr->homePe(idx);
1475           if (homePe != CkMyPe()) {
1476             gmap[homePe].push_back(gID);
1477             imap[homePe].push_back(idx);
1478           }
1479 #endif
1480         }
1481       }
1482 #endif
1483     }
1484
1485
1486     // on every processor
1487     // turn load balancer back on
1488     void CkMemCheckPT::finishUp()
1489     {
1490       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1491       //CKLOCMGR_LOOP(mgr->doneInserting(););
1492
1493       if (CkMyPe() == thisFailedPe)
1494       {
1495         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1496         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1497         fflush(stdout);
1498       }
1499       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1500       inRestarting = 0;
1501
1502 #if CMK_CONVERSE_MPI    
1503       if(CmiNumPartition()!=1){
1504         CpvAccess(recvdProcChkp) = 0;
1505         CpvAccess(recvdArrayChkp) = 0;
1506         CpvAccess(curPointer)^=1;
1507         //notify my replica, restart is done
1508         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1509           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1510           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1511           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1512         }
1513       }
1514       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1515         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1516       }
1517 #endif
1518
1519 #if CK_NO_PROC_POOL
1520 #if NODE_CHECKPOINT
1521       int numnodes = CmiNumPhysicalNodes();
1522 #else
1523       int numnodes = CkNumPes();
1524 #endif
1525       if (numnodes-totalFailed() <=2) {
1526         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1527         _memChkptOn = 0;
1528       }
1529 #endif
1530     }
1531
1532     void CkMemCheckPT::recoverFromSoftFailure()
1533     {
1534       inRestarting = 0;
1535       CpvAccess(recvdRemote) = 0;
1536       CpvAccess(recvdLocal) = 0;
1537       CpvAccess(localChkpDone) = 0;
1538       CpvAccess(remoteChkpDone) = 0;
1539       inCheckpointing = 0;
1540       notifyReplica = 0;
1541       if(CkMyPe() == 0){
1542         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1543       }
1544       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1545     }
1546     // called only on 0
1547     void CkMemCheckPT::quiescence(CkCallback &cb)
1548     {
1549       static int pe_count = 0;
1550       pe_count ++;
1551       CmiAssert(CkMyPe() == 0);
1552       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1553       if (pe_count == CkNumPes()) {
1554         pe_count = 0;
1555         cb.send();
1556       }
1557     }
1558
1559     // User callable function - to start a checkpoint
1560     // callback cb is used to pass control back
1561     void CkStartMemCheckpoint(CkCallback &cb)
1562     {
1563 #if CMK_MEM_CHECKPOINT
1564       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1565       /*if (_memChkptOn == 0) {
1566         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1567         cb.send();
1568         return;
1569         }*/
1570       if (CkInRestarting()) {
1571         // trying to checkpointing during restart
1572         cb.send();
1573         return;
1574       }
1575       // store user callback and user data
1576       CkMemCheckPT::cpCallback = cb;
1577
1578
1579       //send to my replica that checkpoint begins 
1580       if(CkReplicaAlive()==1){
1581         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1582         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1583         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1584       }
1585       CpvAccess(localStarted) = 1;
1586       // broadcast to start check pointing
1587       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1588         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1589         checkptMgr.doItNow(CkMyPe());
1590       }
1591 #else
1592       // when mem checkpoint is disabled, invike cb immediately
1593       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1594       cb.send();
1595 #endif
1596     }
1597
1598     void CkRestartCheckPoint(int diePe)
1599     {
1600       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1601       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1602       // broadcast
1603       checkptMgr.restart(diePe);
1604     }
1605
1606     static int _diePE = -1;
1607
1608     // callback function used locally by ccs handler
1609     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1610     {
1611       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1612       CkRestartCheckPoint(_diePE);
1613     }
1614
1615
1616     // called on crashed PE
1617     static void restartBeginHandler(char *msg)
1618     {
1619       CmiFree(msg);
1620 #if CMK_MEM_CHECKPOINT
1621 #if CMK_USE_BARRIER
1622       if(CkMyPe()!=_diePE){
1623         printf("restar begin on %d\n",CkMyPe());
1624         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1625         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1626         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1627       }else{
1628         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1629         CkRestartCheckPointCallback(NULL, NULL);
1630       }
1631 #else
1632       static int count = 0;
1633       CmiAssert(CkMyPe() == _diePE);
1634       count ++;
1635       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1636         printf("restart begin on %d\n",CkMyPe());
1637         CkRestartCheckPointCallback(NULL, NULL);
1638         count = 0;
1639       }
1640 #endif
1641 #endif
1642     }
1643
1644     extern void _discard_charm_message();
1645     extern void _resume_charm_message();
1646
1647     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1648       return data;
1649     }
1650
1651     static void restartBcastHandler(char *msg)
1652     {
1653 #if CMK_MEM_CHECKPOINT
1654       // advance phase counter
1655       CkMemCheckPT::inRestarting = 1;
1656       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1657       // gzheng
1658       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1659
1660       if (CkMyPe()==_diePE)
1661         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1662
1663       // reset QD counters
1664       /*  gzheng
1665           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1666        */
1667
1668       /*  gzheng
1669           if (CkMyPe()==_diePE)
1670           CkRestartCheckPointCallback(NULL, NULL);
1671        */
1672       CmiFree(msg);
1673
1674       _resume_charm_message();
1675
1676       // reduction
1677       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1678       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1679 #if CMK_USE_BARRIER
1680       //CmiPrintf("before reduce\n");   
1681       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1682       //CmiPrintf("after reduce\n");    
1683 #else
1684       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1685 #endif 
1686       checkpointed = 0;
1687 #endif
1688     }
1689
1690     extern void _initDone();
1691
1692     bool compare(char * buf1, char *buf2){
1693       if(CkMyPe()==0)
1694         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1695       PUP::checker pchecker(buf1,buf2);
1696       pchecker.skip();
1697       int numElements;
1698       pchecker|numElements;
1699       for(int i=0;i<numElements;i++){
1700         CkGroupID gID;
1701         CkArrayIndex idx;
1702
1703         pchecker|gID;
1704         pchecker|idx;
1705
1706         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1707         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1708       }
1709       if(CkMyPe()==0)
1710         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1711       //int fault_num = pchecker.getFaultNum();
1712       bool result = pchecker.getResult();
1713       /*if(!result){
1714         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1715       }*/
1716       return result;
1717     }
1718     int getChecksum(char * buf){
1719       PUP::checker pchecker(buf);
1720       pchecker.skip();
1721       int numElements;
1722       pchecker|numElements;
1723 //      CkPrintf("num %d\n",numElements);
1724       for(int i=0;i<numElements;i++){
1725         CkGroupID gID;
1726         CkArrayIndex idx;
1727
1728         pchecker|gID;
1729         pchecker|idx;
1730
1731         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1732         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1733       }
1734       return pchecker.getChecksum();
1735     }
1736
1737     static void recvRemoteChkpHandler(char *msg){
1738       CpvAccess(remoteChkpDone) = 1;
1739       if(CpvAccess(use_checksum)){
1740         if(CkMyPe()==0)
1741           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1742         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1743         CpvAccess(recvdRemote) = 1;
1744         if(CpvAccess(recvdLocal)==1){
1745           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1746             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1747           }
1748           else{
1749             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1750           }
1751         }
1752       }else{
1753         envelope *env = (envelope *)msg;
1754         CkUnpackMessage(&env);
1755         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1756         if(CpvAccess(recvdLocal)==1){
1757           int pointer = CpvAccess(curPointer);
1758           int size = CpvAccess(chkpBuf)[pointer]->len;
1759           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1760             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1761           }else
1762           {
1763             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1764           }
1765           delete chkpMsg;
1766           if(CkMyPe()==0)
1767             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1768         }else{
1769           CpvAccess(recvdRemote) = 1;
1770           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1771           CpvAccess(buddyBuf) = chkpMsg;
1772         }
1773       }
1774     }
1775
1776     static void replicaRecoverHandler(char *msg){
1777       //fflush(stdout);
1778       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1779       CpvAccess(remoteChkpDone) = 1;
1780       if(CpvAccess(localChkpDone) == 1)
1781         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1782       CmiFree(msg);
1783     }
1784
1785     static void replicaChkpDoneHandler(char *msg){
1786       CpvAccess(remoteChkpDone) = 1;
1787       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1788       if(CpvAccess(localChkpDone) == 1)
1789         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1790       CmiFree(msg);
1791     }
1792
1793     static void replicaDieHandler(char * msg){
1794 #if CMK_CONVERSE_MPI
1795       //broadcast to every one in my replica
1796       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1797       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1798 #endif
1799     }
1800
1801     static void replicaChkpStartHandler(char * msg){
1802       CpvAccess(remoteStarted) =1;
1803       if(CpvAccess(localStarted)==1){    
1804         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1805         checkptMgr.doItNow(0);
1806       }
1807     }
1808
1809
1810     static void replicaDieBcastHandler(char *msg){
1811       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1812       CpvAccess(_remoteCrashedNode) = diePe;
1813       if(CkMyPe()==diePe){
1814         CmiPrintf("pe %d in replicad word die\n",diePe);
1815         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1816         fflush(stdout);
1817       }
1818       find_spare_mpirank(diePe,CmiMyPartition()^1);
1819       //broadcast to my partition to get local max iter
1820       if(CpvAccess(resilience)!=1){
1821         CkMemCheckPT::replicaAlive = 0;
1822         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1823       }
1824       CmiFree(msg);
1825     }
1826
1827     static void recoverRemoteProcDataHandler(char *msg){
1828       envelope *env = (envelope *)msg;
1829       CkUnpackMessage(&env);
1830       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1831
1832       //store the checkpoint
1833       int pointer = procMsg->pointer;
1834
1835
1836       if(CkMyPe()==CpvAccess(_crashedNode)){
1837         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1838         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1839         PUP::fromMem p(procMsg->packData);
1840         _handleProcData(p,CmiTrue);
1841         _initDone();
1842       }
1843       else{
1844         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1845         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1846         //_handleProcData(p,CmiFalse);
1847       }
1848       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1849       CKLOCMGR_LOOP(mgr->startInserting(););
1850
1851       CpvAccess(recvdProcChkp) =1;
1852       if(CpvAccess(recvdArrayChkp)==1){
1853         _resume_charm_message();
1854         _diePE = CpvAccess(_crashedNode);
1855         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1856         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1857         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1858       }
1859     }
1860
1861     static void recoverRemoteArrayDataHandler(char *msg){
1862       envelope *env = (envelope *)msg;
1863       CkUnpackMessage(&env);
1864       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1865
1866       //store the checkpoint
1867       int pointer = chkpMsg->pointer;
1868       CpvAccess(curPointer) = pointer;
1869       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1870       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1871       CpvAccess(recvdArrayChkp) =1;
1872       CkMemCheckPT::inRestarting = 1;
1873       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1874         _resume_charm_message();
1875         _diePE = CpvAccess(_crashedNode);
1876         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1877         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1878         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1879         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1880       }
1881     }
1882
1883     static void recvPhaseHandler(char * msg)
1884     {
1885       CpvAccess(_curRestartPhase)--;
1886       CkMemCheckPT::inRestarting = 1;
1887       CmiFree(msg);
1888       //notify the buddy in the replica now i can receive the checkpoint msg
1889       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1890         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1891         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1892         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1893         //timer start
1894         if(CpvAccess(_crashedNode)==CkMyPe())
1895           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1896         if(CpvAccess(_crashedNode)==CmiMyPe())
1897           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1898       }else{
1899         CpvAccess(curPointer)^=1;
1900         CkMemCheckPT::inRestarting = 1;
1901         _resume_charm_message();
1902         _diePE = CpvAccess(_crashedNode);
1903       }
1904     }
1905     
1906     static void askRecoverDataHandler(char * msg){
1907       if(CpvAccess(resilience)!=1){
1908         CpvAccess(remoteReady)=1;
1909         if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1910           CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1911         if(CpvAccess(localReady)==1){
1912           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1913           {     
1914             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1915             CkPackMessage(&env);
1916             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1917             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1918             CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
1919           }
1920           //send the array checkpoint data
1921           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1922           CkPackMessage(&env);
1923           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1924           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1925           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1926             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1927         }
1928       }else{
1929         int pointer = CpvAccess(curPointer)^1;
1930         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1931         procMsg->pointer = pointer;
1932         envelope * env = (envelope *)(UsrToEnv(procMsg));
1933         CkPackMessage(&env);
1934         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1935         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1936         CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
1937         
1938         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1939         arrayMsg->pointer = pointer;
1940         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1941         CkPackMessage(&env1);
1942         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1943         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1944         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1945       }
1946     }
1947     // called on crashed processor
1948     static void recoverProcDataHandler(char *msg)
1949     {
1950 #if CMK_MEM_CHECKPOINT
1951       int i;
1952       envelope *env = (envelope *)msg;
1953       CkUnpackMessage(&env);
1954       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1955       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1956       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1957       PUP::fromMem p(procMsg->packData);
1958       _handleProcData(p,CmiTrue);
1959
1960       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1961       // gzheng
1962       CKLOCMGR_LOOP(mgr->startInserting(););
1963
1964       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1965       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1966       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1967       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1968
1969       _initDone();
1970       //   CpvAccess(_qd)->flushStates();
1971       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1972 #endif
1973     }
1974     //for replica, got the phase number from my neighbor processor in the current partition
1975     static void askPhaseHandler(char *msg)
1976     {
1977 #if CMK_MEM_CHECKPOINT
1978       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1979       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1980       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1981       CmiSetHandler(msg, recvPhaseHandlerIdx);
1982       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1983 #endif
1984     }
1985     // called on its backup processor
1986     // get backup message buffer and sent to crashed processor
1987     static void askProcDataHandler(char *msg)
1988     {
1989 #if CMK_MEM_CHECKPOINT
1990       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1991       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1992       if (CpvAccess(procChkptBuf) == NULL)  {
1993         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1994         CkAbort("no checkpoint found");
1995       }
1996       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1997       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1998
1999       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2000
2001       CkPackMessage(&env);
2002       CmiSetHandler(env, recoverProcDataHandlerIdx);
2003       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2004       CpvAccess(procChkptBuf) = NULL;
2005       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2006 #endif
2007     }
2008
2009     // called on PE 0
2010     void qd_callback(void *m)
2011     {
2012       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2013       fflush(stdout);
2014       CkFreeMsg(m);
2015       if(CmiNumPartition()==1){
2016 #ifdef CMK_SMP
2017         for(int i=0;i<CmiMyNodeSize();i++){
2018           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2019           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2020           CmiSetHandler(msg, askProcDataHandlerIdx);
2021           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2022           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2023         }
2024         return;
2025 #endif
2026         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2027         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2028         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2029         CmiSetHandler(msg, askProcDataHandlerIdx);
2030         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2031         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2032       }
2033       else{
2034         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2035         CmiSetHandler(msg, recvPhaseHandlerIdx);
2036         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2037       }
2038     }
2039
2040     // on crashed node
2041     void CkMemRestart(const char *dummy, CkArgMsg *args)
2042     {
2043 #if CMK_MEM_CHECKPOINT
2044       _diePE = CmiMyNode();
2045       CpvAccess( _crashedNode )= CmiMyNode();
2046       CkMemCheckPT::inRestarting = 1;
2047       _discard_charm_message();
2048
2049       /*if(CmiMyRank()==0){
2050         CkCallback cb(qd_callback);
2051         CkStartQD(cb);
2052         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2053         }*/
2054       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2055       if(CmiNumPartition()==1){
2056         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2057         restartT = CmiWallTimer();
2058         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2059         fflush(stdout);
2060         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2061         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2062         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2063         CmiSetHandler(msg, askProcDataHandlerIdx);
2064         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2065         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2066       }
2067       else{
2068         CkCallback cb(qd_callback);
2069         CkStartQD(cb);
2070       }
2071 #else
2072       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2073 #endif
2074     }
2075
2076     // can be called in other files
2077     // return true if it is in restarting
2078     extern "C"
2079       int CkInRestarting()
2080       {
2081 #if CMK_MEM_CHECKPOINT
2082         if (CpvAccess( _crashedNode)!=-1) return 1;
2083         // gzheng
2084         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2085         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2086         return CkMemCheckPT::inRestarting;
2087 #else
2088         return 0;
2089 #endif
2090       }
2091
2092     extern "C"
2093       int CkReplicaAlive()
2094       {
2095         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2096         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2097         return CkMemCheckPT::replicaAlive;
2098
2099         /*if(CkMemCheckPT::replicaDead==1)
2100           return 0;
2101           else          
2102           return 1;*/
2103       }
2104
2105     extern "C"
2106       int CkInCheckpointing()
2107       {
2108         return CkMemCheckPT::inCheckpointing;
2109       }
2110
2111     extern "C"
2112       void CkSetInLdb(){
2113 #if CMK_MEM_CHECKPOINT
2114         CkMemCheckPT::inLoadbalancing = 1;
2115 #endif
2116       }
2117
2118     extern "C"
2119       int CkInLdb(){
2120 #if CMK_MEM_CHECKPOINT
2121         return CkMemCheckPT::inLoadbalancing;
2122 #endif
2123         return 0;
2124       }
2125
2126     extern "C"
2127       void CkResetInLdb(){
2128 #if CMK_MEM_CHECKPOINT
2129         CkMemCheckPT::inLoadbalancing = 0;
2130 #endif
2131       }
2132
2133     /*****************************************************************************
2134       module initialization
2135      *****************************************************************************/
2136
2137     static int arg_where = CkCheckPoint_inMEM;
2138
2139 #if CMK_MEM_CHECKPOINT
2140     void init_memcheckpt(char **argv)
2141     {
2142       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2143         arg_where = CkCheckPoint_inDISK;
2144       }
2145       CpvInitialize(int, use_checksum);
2146       CpvInitialize(int, resilience);
2147       CpvAccess(use_checksum)=0;
2148       CpvAccess(resilience)=0;
2149       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2150         CpvAccess(use_checksum)=1;
2151       }
2152       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2153         CpvAccess(resilience)=1;
2154       }
2155       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2156         CpvAccess(resilience)=2;
2157       }
2158       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2159         CpvAccess(resilience)=3;
2160       }
2161       // initiliazing _crashedNode variable
2162       CpvInitialize(int, _crashedNode);
2163       CpvInitialize(int, _remoteCrashedNode);
2164       CpvAccess(_crashedNode) = -1;
2165       CpvAccess(_remoteCrashedNode) = -1;
2166       init_FI(argv);
2167     }
2168 #endif
2169
2170     class CkMemCheckPTInit: public Chare {
2171       public:
2172         CkMemCheckPTInit(CkArgMsg *m) {
2173 #if CMK_MEM_CHECKPOINT
2174           if (arg_where == CkCheckPoint_inDISK) {
2175             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2176           }
2177           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2178           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2179 #endif
2180         }
2181     };
2182
2183     static void notifyHandler(char *msg)
2184     {
2185 #if CMK_MEM_CHECKPOINT
2186       CmiFree(msg);
2187       /* immediately increase restart phase to filter old messages */
2188       CpvAccess(_curRestartPhase) ++;
2189       CpvAccess(_qd)->flushStates();
2190       _discard_charm_message();
2191
2192 #endif
2193     }
2194
2195     extern "C"
2196       void notify_crash(int node)
2197       {
2198 #ifdef CMK_MEM_CHECKPOINT
2199         CpvAccess( _crashedNode) = node;
2200 #ifdef CMK_SMP
2201         for(int i=0;i<CkMyNodeSize();i++){
2202           CpvAccessOther(_crashedNode,i)=node;
2203         }
2204 #endif
2205         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2206         CkMemCheckPT::inRestarting = 1;
2207
2208         // this may be in interrupt handler, send a message to reset QD
2209         int pe = CmiNodeFirst(CkMyNode());
2210         for(int i=0;i<CkMyNodeSize();i++){
2211           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2212           CmiSetHandler(msg, notifyHandlerIdx);
2213           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2214         }
2215 #endif
2216       }
2217
2218     extern "C" void (*notify_crash_fn)(int node);
2219
2220
2221 #if CMK_CONVERSE_MPI
2222     void buddyDieHandler(char *msg)
2223     {
2224 #if CMK_MEM_CHECKPOINT
2225       // notify
2226       CkMemCheckPT::inRestarting = 1;
2227       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2228       notify_crash(diepe);
2229       // send message to crash pe to let it restart
2230       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2231       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2232       int buddy = obj->BuddyPE(CmiMyPe());
2233       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2234       if (buddy == diepe)  {
2235         mpi_restart_crashed(diepe, newrank);
2236       }
2237 #endif
2238     }
2239
2240     void pingHandler(void *msg)
2241     {
2242       lastPingTime = CmiWallTimer();
2243       CmiFree(msg);
2244     }
2245
2246     void pingCheckHandler()
2247     {
2248 #if CMK_MEM_CHECKPOINT
2249       double now = CmiWallTimer();
2250       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2251         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2252         int i, pe, buddy;
2253         // tell everyone the buddy dies
2254         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2255         for (i = 1; i < CmiNumPes(); i++) {
2256           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2257           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2258         }
2259         buddy = pe;
2260         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2261         fflush(stdout);
2262         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2263         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2264         CmiSetHandler(msg, buddyDieHandlerIdx);
2265         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2266         //send to everyone in the other world
2267         if(CmiNumPartition()!=1){
2268           //for(int i=0;i<CmiNumPes();i++){
2269             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2270             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2271             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2272             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2273           //}
2274         }
2275       }
2276         else 
2277           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2278 #endif
2279       }
2280
2281       void pingBuddy()
2282       {
2283 #if CMK_MEM_CHECKPOINT
2284         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2285         if (obj) {
2286           int buddy = obj->BuddyPE(CkMyPe());
2287           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2288           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2289           CmiSetHandler(msg, pingHandlerIdx);
2290           CmiGetRestartPhase(msg) = 9999;
2291           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2292         }
2293         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2294 #endif
2295       }
2296 #endif
2297
2298       // initproc
2299       void CkRegisterRestartHandler( )
2300       {
2301 #if CMK_MEM_CHECKPOINT
2302         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2303         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2304         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2305         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2306         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2307         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2308
2309         //for replica
2310         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2311         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2312         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2313         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2314         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2315         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2316         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2317         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2318         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2319         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2320
2321 #if CMK_CONVERSE_MPI
2322         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2323         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2324         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2325 #endif
2326
2327         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2328         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2329         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2330         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2331         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2332         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2333         CpvInitialize(int,curPointer);
2334         CpvInitialize(int,recvdLocal);
2335         CpvInitialize(int,localChkpDone);
2336         CpvInitialize(int,remoteChkpDone);
2337         CpvInitialize(int,remoteStarted);
2338         CpvInitialize(int,localStarted);
2339         CpvInitialize(int,localReady);
2340         CpvInitialize(int,remoteReady);
2341         CpvInitialize(int,recvdRemote);
2342         CpvInitialize(int,recvdProcChkp);
2343         CpvInitialize(int,localChecksum);
2344         CpvInitialize(int,remoteChecksum);
2345         CpvInitialize(int,recvdArrayChkp);
2346
2347         CpvAccess(procChkptBuf) = NULL;
2348         CpvAccess(buddyBuf) = NULL;
2349         CpvAccess(recoverProcBuf) = NULL;
2350         CpvAccess(recoverArrayBuf) = NULL;
2351         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2352         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2353         CpvAccess(chkpBuf)[0] = NULL;
2354         CpvAccess(chkpBuf)[1] = NULL;
2355         CpvAccess(localProcChkpBuf)[0] = NULL;
2356         CpvAccess(localProcChkpBuf)[1] = NULL;
2357
2358         CpvAccess(curPointer) = 0;
2359         CpvAccess(recvdLocal) = 0;
2360         CpvAccess(localChkpDone) = 0;
2361         CpvAccess(remoteChkpDone) = 0;
2362         CpvAccess(remoteStarted) = 0;
2363         CpvAccess(localStarted) = 0;
2364         CpvAccess(localReady) = 0;
2365         CpvAccess(remoteReady) = 0;
2366         CpvAccess(recvdRemote) = 0;
2367         CpvAccess(recvdProcChkp) = 0;
2368         CpvAccess(localChecksum) = 0;
2369         CpvAccess(remoteChecksum) = 0;
2370         CpvAccess(recvdArrayChkp) = 0;
2371
2372         notify_crash_fn = notify_crash;
2373
2374 #if ! CMK_CONVERSE_MPI
2375         // print pid to kill
2376         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2377         //  sleep(4);
2378 #endif
2379 #endif
2380       }
2381
2382
2383       extern "C"
2384         int CkHasCheckpoints()
2385         {
2386           return checkpointed;
2387         }
2388
2389       /// @todo: the following definitions should be moved to a separate file containing
2390       // structures and functions about fault tolerance strategies
2391
2392       /**
2393        *  * @brief: function for killing a process                                             
2394        *   */
2395 #ifdef CMK_MEM_CHECKPOINT
2396 #if CMK_HAS_GETPID
2397       void killLocal(void *_dummy,double curWallTime){
2398         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2399         if(CmiWallTimer()<killTime-1){
2400           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2401         }else{ 
2402 #if CMK_CONVERSE_MPI
2403           CkDieNow();
2404 #else 
2405           kill(getpid(),SIGKILL);                                               
2406 #endif
2407         }              
2408       } 
2409 #else
2410       void killLocal(void *_dummy,double curWallTime){
2411         CmiAbort("kill() not supported!");
2412       }
2413 #endif
2414 #endif
2415
2416 #ifdef CMK_MEM_CHECKPOINT
2417       /**
2418        * @brief: reads the file with the kill information
2419        */
2420       void readKillFile(){
2421         FILE *fp=fopen(killFile,"r");
2422         if(!fp){
2423           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2424           return;
2425         }
2426         int proc;
2427         double sec;
2428         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2429           if(proc == CkMyNode() && CkMyRank() == 0){
2430             killTime = CmiWallTimer()+sec;
2431             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2432             CcdCallFnAfter(killLocal,NULL,sec*1000);
2433           }
2434         }
2435         fclose(fp);
2436       }
2437
2438 #if ! CMK_CONVERSE_MPI
2439       void CkDieNow()
2440       {
2441 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2442         // ignored for non-mpi version
2443         CmiPrintf("[%d] die now.\n", CmiMyPe());
2444         killTime = CmiWallTimer()+0.001;
2445         CcdCallFnAfter(killLocal,NULL,1);
2446 #endif
2447       }
2448 #endif
2449
2450 #endif
2451
2452 #include "CkMemCheckpoint.def.h"
2453
2454