bug fix: handle failure detected during checkpoint
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
225   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
226 }
227 #endif
228
229 // entry function invoked by checkpoint mgr asking for checkpoint data
230 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
231 #if CMK_MEM_CHECKPOINT
232   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
233   //char index[128];   thisIndexMax.sprint(index);
234   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
235   CkLocMgr *locMgr = thisArray->getLocMgr();
236   CmiAssert(myRec!=NULL);
237   int size;
238   {
239     PUP::sizer p;
240     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
241     size = p.size();
242   }
243   int packSize = size/sizeof(double) +1;
244   CkArrayCheckPTMessage *msg =
245     new (packSize, 0) CkArrayCheckPTMessage;
246   msg->len = size;
247   msg->index =thisIndexMax;
248   msg->aid = thisArrayID;
249   msg->locMgr = locMgr->getGroupID();
250   msg->cp_flag = 1;
251   {
252     PUP::toMem p(msg->packData);
253     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
254   }
255
256   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
257   checkptMgr.recvData(msg, 2, budPEs);
258   delete m;
259 #endif
260 }
261
262 // checkpoint holder class - for memory checkpointing
263 class CkMemCheckPTInfo: public CkCheckPTInfo
264 {
265   CkArrayCheckPTMessage *ckBuffer;
266   public:
267   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
268     CkCheckPTInfo(a, loc, idx, pno)
269   {
270     ckBuffer = NULL;
271   }
272   ~CkMemCheckPTInfo() 
273   {
274     if (ckBuffer) delete ckBuffer; 
275   }
276   inline void updateBuffer(CkArrayCheckPTMessage *data) 
277   {
278     CmiAssert(data!=NULL);
279     if (ckBuffer) delete ckBuffer;
280     ckBuffer = data;
281   }    
282   inline CkArrayCheckPTMessage * getCopy()
283   {
284     if (ckBuffer == NULL) {
285       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
286       CmiAbort("Abort!");
287     }
288     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
289   }     
290   inline void updateBuddy(int b1, int b2) {
291     CmiAssert(ckBuffer);
292     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
293     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
294     CmiAssert(pNo != CkMyPe());
295   }
296   inline int getSize() { 
297     CmiAssert(ckBuffer);
298     return ckBuffer->len; 
299   }
300 };
301
302 // checkpoint holder class - for in-disk checkpointing
303 class CkDiskCheckPTInfo: public CkCheckPTInfo 
304 {
305   char *fname;
306   int bud1, bud2;
307   int len;                      // checkpoint size
308   public:
309   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
310   {
311 #if CMK_USE_MKSTEMP
312     fname = new char[64];
313     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
314     mkstemp(fname);
315 #else
316     fname=tmpnam(NULL);
317 #endif
318     bud1 = bud2 = -1;
319     len = 0;
320   }
321   ~CkDiskCheckPTInfo() 
322   {
323     remove(fname);
324   }
325   inline void updateBuffer(CkArrayCheckPTMessage *data) 
326   {
327     double t = CmiWallTimer();
328     // unpack it
329     envelope *env = UsrToEnv(data);
330     CkUnpackMessage(&env);
331     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
332     FILE *f = fopen(fname,"wb");
333     PUP::toDisk p(f);
334     CkPupMessage(p, (void **)&data);
335     // delay sync to the end because otherwise the messages are blocked
336     //    fsync(fileno(f));
337     fclose(f);
338     bud1 = data->bud1;
339     bud2 = data->bud2;
340     len = data->len;
341     delete data;
342     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
343   }
344   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
345   {
346     CkArrayCheckPTMessage *data;
347     FILE *f = fopen(fname,"rb");
348     PUP::fromDisk p(f);
349     CkPupMessage(p, (void **)&data);
350     fclose(f);
351     data->bud1 = bud1;                          // update the buddies
352     data->bud2 = bud2;
353     return data;
354   }
355   inline void updateBuddy(int b1, int b2) {
356     bud1 = b1; bud2 = b2;
357     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
358     CmiAssert(pNo != CkMyPe());
359   }
360   inline int getSize() { 
361     return len; 
362   }
363 };
364
365 CkMemCheckPT::CkMemCheckPT(int w)
366 {
367   int numnodes = 0;
368 #if NODE_CHECKPOINT
369   numnodes = CmiNumPhysicalNodes();
370 #else
371   numnodes = CkNumPes();
372 #endif
373 #if CK_NO_PROC_POOL
374   if (numnodes <= 2)
375 #else
376     if (numnodes  == 1)
377 #endif
378     {
379       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
380       _memChkptOn = 0;
381     }
382   inRestarting = 0;
383   inCheckpointing = 0;
384   recvCount = peCount = 0;
385   ackCount = 0;
386   expectCount = -1;
387   where = w;
388   replicaAlive = 1;
389   notifyReplica = 0;
390 #if CMK_CONVERSE_MPI
391   void pingBuddy();
392   void pingCheckHandler();
393   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
394   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
395 #endif
396   chkpTable[0] = NULL;
397   chkpTable[1] = NULL;
398   maxIter = -1;
399   recvIterCount = 0;
400   localDecided = false;
401 }
402
403 CkMemCheckPT::~CkMemCheckPT()
404 {
405   int len = ckTable.length();
406   for (int i=0; i<len; i++) {
407     delete ckTable[i];
408   }
409 }
410
411 void CkMemCheckPT::pup(PUP::er& p) 
412
413   CBase_CkMemCheckPT::pup(p); 
414   p|cpStarter;
415   p|thisFailedPe;
416   p|failedPes;
417   p|ckCheckPTGroupID;           // recover global variable
418   p|cpCallback;                 // store callback
419   p|where;                      // where to checkpoint
420   p|peCount;
421   if (p.isUnpacking()) {
422     recvCount = 0;
423 #if CMK_CONVERSE_MPI
424     void pingBuddy();
425     void pingCheckHandler();
426     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
427     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
428 #endif
429     maxIter = -1;
430     recvIterCount = 0;
431     localDecided = false;
432   }
433 }
434
435 void CkMemCheckPT::getIter(){
436   localDecided = true;
437   localMaxIter = maxIter+1;
438   if(CkMyPe()==0){
439     CkPrintf("local max iter is %d\n",localMaxIter);
440   }
441   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
442   int elemCount = CkCountChkpSyncElements();
443   if(CkMyPe()==0)
444     startTime = CmiWallTimer();
445   if(elemCount == 0){
446     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
447   }
448 }
449
450 //when one replica failes, decide the next chkp iter
451
452 void CkMemCheckPT::recvIter(int iter){
453   if(maxIter<iter){
454     maxIter=iter;
455   }
456 }
457
458 void CkMemCheckPT::recvMaxIter(int iter){
459   localDecided = false;
460   if(CkMyPe()==0)
461     CkPrintf("checkpoint iteration is %d\n",iter);
462   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
463 }
464
465 void CkMemCheckPT::reachChkpIter(){
466   recvIterCount++;
467   elemCount = CkCountChkpSyncElements();
468   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
469   if(recvIterCount == elemCount){
470     recvIterCount = 0;
471     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
472   }
473 }
474
475 void CkMemCheckPT::startChkp(){
476   if(CkInCheckpointing()){
477     return;
478   }
479   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
480   CkStartMemCheckpoint(cpCallback);
481 }
482
483 // called by checkpoint mgr to restore an array element
484 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
485 {
486 #if CMK_MEM_CHECKPOINT
487   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
488   // m->index.print();
489   PUP::fromMem p(m->packData);
490   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
491   CmiAssert(mgr);
492 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
493   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
494 #else
495   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
496 #endif
497
498   // find a list of array elements bound together
499   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
500   CmiAssert(elt);
501   CkLocRec_local *rec = elt->myRec;
502   CkVec<CkMigratable *> list;
503   mgr->migratableList(rec, list);
504   CmiAssert(list.length() > 0);
505   for (int l=0; l<list.length(); l++) {
506     elt = (ArrayElement *)list[l];
507     elt->budPEs[0] = m->bud1;
508     elt->budPEs[1] = m->bud2;
509     //    reset, may not needed now
510     // for now.
511     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
512       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
513       if (c) c->redNo = 0;
514     }
515   }
516 #endif
517 }
518
519 // return 1 if pe is a crashed processor
520 int CkMemCheckPT::isFailed(int pe)
521 {
522   for (int i=0; i<failedPes.length(); i++)
523     if (failedPes[i] == pe) return 1;
524   return 0;
525 }
526
527 // add pe into history list of all failed processors
528 void CkMemCheckPT::failed(int pe)
529 {
530   if (isFailed(pe)) return;
531   failedPes.push_back(pe);
532 }
533
534 int CkMemCheckPT::totalFailed()
535 {
536   return failedPes.length();
537 }
538
539 // create an checkpoint entry for array element of aid with index.
540 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
541 {
542   // error check, no duplicate
543   int idx, len = ckTable.size();
544   for (idx=0; idx<len; idx++) {
545     CkCheckPTInfo *entry = ckTable[idx];
546     if (index == entry->index) {
547       if (loc == entry->locMgr) {
548         // bindTo array elements
549         return;
550       }
551       // for array inheritance, the following check may fail
552       // because ArrayElement constructor of all superclasses are called
553       if (aid == entry->aid) {
554         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
555         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
556       }
557     }
558   }
559   CkCheckPTInfo *newEntry;
560   if (where == CkCheckPoint_inMEM)
561     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
562   else
563     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
564   ckTable.push_back(newEntry);
565   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
566 }
567
568 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
569 {
570 #if !CMK_CHKP_ALL       
571   int buddy = msg->bud1;
572   if (buddy == CkMyPe()) buddy = msg->bud2;
573   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
574   recvData(msg);
575   // ack
576   thisProxy[buddy].gotData();
577 #else
578   chkpTable[0] = NULL;
579   chkpTable[1] = NULL;
580   recvArrayCheckpoint(msg);
581   thisProxy[msg->bud2].gotData();
582 #endif
583 }
584
585 void CkMemCheckPT::chkpLocalStart()
586 {
587   CpvAccess(localStarted) = 1;
588 }
589
590 // loop through my checkpoint table and ask checkpointed array elements
591 // to send me checkpoint data.
592 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
593 void CkMemCheckPT::doItNow(int starter)
594 {
595   checkpointed = 1;
596   //cpCallback = cb;
597   cpStarter = starter;
598   inCheckpointing = 1;
599   if (CkMyPe() == cpStarter) {
600     startTime = CmiWallTimer();
601     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
602   }
603 #if CMK_CONVERSE_MPI
604   if(CmiNumPartition()==1)
605 #endif    
606   {
607 #if !CMK_CHKP_ALL
608     int len = ckTable.length();
609     for (int i=0; i<len; i++) {
610       CkCheckPTInfo *entry = ckTable[i];
611       // always let the bigger number processor send request
612       //if (CkMyPe() < entry->pNo) continue;
613       // always let the smaller number processor send request, may on same proc
614       if (!isMaster(entry->pNo)) continue;
615       // call inmem_checkpoint to the array element, ask it to send
616       // back checkpoint data via recvData().
617       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
618       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
619     }
620     // if my table is empty, then I am done
621     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
622 #else
623     startArrayCheckpoint();
624 #endif
625     // pack and send proc level data
626     sendProcData();
627   }
628 #if CMK_CONVERSE_MPI
629   else
630   {
631     startCheckpoint();
632   }
633 #endif
634 }
635
636 class MemElementPacker : public CkLocIterator{
637   private:
638     //    CkLocMgr *locMgr;
639     PUP::er &p;
640     std::map<CkHashCode,CkLocation> arrayMap;
641   public:
642     MemElementPacker(PUP::er &p_):p(p_){};
643     void addLocation(CkLocation &loc){
644       CkArrayIndexMax idx = loc.getIndex();
645       arrayMap[idx.hash()] = loc;
646     }
647     void writeCheckpoint(){
648       std::map<CkHashCode, CkLocation>::iterator it;
649       for(it = arrayMap.begin();it!=arrayMap.end();it++){
650         CkLocation loc = it->second;
651         CkLocMgr *locMgr = loc.getManager();
652         CkArrayIndexMax idx = loc.getIndex();
653         CkGroupID gID = locMgr->ckGetGroupID();
654         p|gID;
655         p|idx;
656         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
657       }
658     }
659 };
660
661 void pupAllElements(PUP::er &p){
662 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
663   int numElements;
664   if(!p.isUnpacking()){
665     numElements = CkCountArrayElements();
666   }
667   p | numElements;
668   if(!p.isUnpacking()){
669     MemElementPacker packer(p);
670     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
671     packer.writeCheckpoint();
672   }
673 #endif
674 }
675
676 void CkMemCheckPT::startArrayCheckpoint(){
677 #if CMK_CHKP_ALL
678   int size;
679   {
680     PUP::sizer psizer;
681     pupAllElements(psizer);
682     size = psizer.size();
683   }
684   int packSize = size/sizeof(double)+1;
685   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
686   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
687   msg->len = size;
688   msg->cp_flag = 1;
689   int budPEs[2];
690   msg->bud1=CkMyPe();
691   msg->bud2=ChkptOnPe(CkMyPe());
692   {
693     PUP::toMem p(msg->packData);
694     pupAllElements(p);
695   }
696   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
697   if(chkpTable[0]) delete chkpTable[0];
698   chkpTable[0] = msg;
699   //send the checkpoint to my 
700   recvCount++;
701 #endif
702 }
703
704 void CkMemCheckPT::startCheckpoint(){
705 #if CMK_CONVERSE_MPI
706   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
707     CkPrintf("in start checkpointing!!!!\n");
708   int size;
709   {
710     PUP::sizer p;
711     _handleProcData(p,CmiFalse);
712     size = p.size();
713   }
714   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
715   procMsg->len = size;
716   procMsg->cp_flag = 1;
717   {
718     PUP::toMem p(procMsg->packData);
719     _handleProcData(p,CmiFalse);
720   }
721   int pointer = CpvAccess(curPointer);
722   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
723   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
724
725   {
726     PUP::sizer psizer;
727     pupAllElements(psizer);
728     size = psizer.size();
729   }
730   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
731   msg->len = size;
732   msg->cp_flag = 1;
733   int checksum;
734   {
735     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
736       PUP::checker p(msg->packData);
737       pupAllElements(p);
738       checksum = p.getChecksum();
739     }else{  
740 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
741       PUP::toMem p(msg->packData);
742       pupAllElements(p);
743     }
744   }
745   pointer = CpvAccess(curPointer);
746   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
747   CpvAccess(chkpBuf)[pointer] = msg;
748   if(CkMyPe()==0)
749     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
750   if(CkReplicaAlive()==1){
751     CpvAccess(recvdLocal) = 1;
752     if(CpvAccess(use_checksum)){
753       CpvAccess(localChecksum) = checksum;
754       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
755       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
756       //only one reaplica will send
757       if(CmiMyPartition()==0){
758         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
759         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
760       }
761     }else{
762       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
763       CkPackMessage(&env);
764       if(CmiMyPartition()==0){
765         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
766         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
767       }
768     }
769     if(CmiMyPartition()==0){
770       notifyReplica = 1;
771       thisProxy[CkMyPe()].doneComparison(true);
772     }
773   }
774   if(CpvAccess(recvdRemote)==1){
775     //only partition 1 will do it
776     //compare the checkpoint 
777     int size = CpvAccess(chkpBuf)[pointer]->len;
778     if(CpvAccess(use_checksum)){
779       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
780         thisProxy[CkMyPe()].doneComparison(true);
781       }
782       else{
783         thisProxy[CkMyPe()].doneComparison(false);
784       }
785     }else{
786       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
787         thisProxy[CkMyPe()].doneComparison(true);
788       }
789       else{
790         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
791         thisProxy[CkMyPe()].doneComparison(false);
792       }
793     }
794     if(CkMyPe()==0)
795       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
796   }
797   else{
798     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
799       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
800       if(CpvAccess(remoteReady)==1){
801         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
802         {       
803           int pointer = CpvAccess(curPointer);
804           //send the proc data
805           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
806           procMsg->pointer = pointer;
807           envelope * env = (envelope *)(UsrToEnv(procMsg));
808           CkPackMessage(&env);
809           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
810           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
811           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
812             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
813         }
814         //send the array checkpoint data
815         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
816         msg->pointer = CpvAccess(curPointer);
817         envelope * env = (envelope *)(UsrToEnv(msg));
818         CkPackMessage(&env);
819         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
820         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
821         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
822           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
823       }else{
824         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
825           int pointer = CpvAccess(curPointer);
826           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
827           (CpvAccess(recoverProcBuf))->pointer = pointer;
828         }
829         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
830         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
831         CpvAccess(localReady) = 1;
832       }
833       //can continue work, no need to wait for my replica
834       int _ret = 1;
835       notifyReplica = 1;
836       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
837       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
838     }
839   }
840 #endif
841 }
842
843 void CkMemCheckPT::doneComparison(bool ret){
844   int _ret = 1;
845   if(!ret){
846     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
847     _ret = 0;
848   }else{
849     _ret = 1;
850   }
851   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
852   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
853 }
854
855 void CkMemCheckPT::doneRComparison(int ret){
856   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
857   //    if(CpvAccess(curPointer) == 0){
858   //if(ret==CkNumPes()){
859     CpvAccess(localChkpDone) = 1;
860     if(CpvAccess(remoteChkpDone) ==1){
861       thisProxy.doneBothComparison();
862     }else{
863       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
864     }
865   //}
866   /*else{
867     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
868     startTime = CmiWallTimer();
869     thisProxy.RollBack();
870   }*/
871   if(notifyReplica == 0){
872     //notify the replica am done
873     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
874     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
875     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
876     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
877     notifyReplica = 1;
878   }
879 }
880
881 void CkMemCheckPT::doneBothComparison(){
882   CpvAccess(recvdRemote) = 0;
883   CpvAccess(remoteReady) = 0;
884   CpvAccess(localReady) = 0;
885   CpvAccess(recvdLocal) = 0;
886   CpvAccess(localChkpDone) = 0;
887   CpvAccess(remoteChkpDone) = 0;
888   CpvAccess(remoteStarted) = 0;
889   CpvAccess(localStarted) = 0;
890   CpvAccess(_remoteCrashedNode) = -1;
891   CkMemCheckPT::replicaAlive = 1;
892   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
893   CpvAccess(curPointer)^=1;
894   inCheckpointing = 0;
895   notifyReplica = 0;
896   if(CkMyPe() == 0){
897     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
898   }
899   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
900 }
901
902 void CkMemCheckPT::RollBack(){
903   //restore group data
904   checkpointed = 0;
905   inRestarting = 1;
906   int pointer = CpvAccess(curPointer)^1;//use the previous one
907   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
908   PUP::fromMem p(chkpMsg->packData);    
909
910   //destroy array elements
911   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
912   int numGroups = CkpvAccess(_groupIDTable)->size();
913   for(int i=0;i<numGroups;i++) {
914     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
915     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
916     obj->flushStates();
917     obj->ckJustMigrated();
918   }
919   //restore array elements
920
921   int numElements;
922   p|numElements;
923
924   if(p.isUnpacking()){
925     for(int i=0;i<numElements;i++){
926       CkGroupID gID;
927       CkArrayIndex idx;
928       p|gID;
929       p|idx;
930       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
931       CmiAssert(mgr);
932       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
933     }
934     }
935     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
936     contribute(cb);
937   }
938
939   void CkMemCheckPT::notifyReplicaDie(int pe){
940     replicaAlive = 0;
941     CpvAccess(_remoteCrashedNode) = pe;
942   }
943
944   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
945   {
946 #if CMK_CHKP_ALL
947     int idx = 1;
948     if(msg->bud1 == CkMyPe()){
949       idx = 0;
950     }
951     int isChkpting = msg->cp_flag;
952     if(isChkpting == 1){
953       if(chkpTable[idx]) delete chkpTable[idx];
954     }
955     chkpTable[idx] = msg;
956     if(isChkpting){
957       recvCount++;
958       if(recvCount == 2){
959         if (where == CkCheckPoint_inMEM) {
960           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
961         }
962         else if (where == CkCheckPoint_inDISK) {
963           // another barrier for finalize the writing using fsync
964           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
965           contribute(0,NULL,CkReduction::sum_int,localcb);
966         }
967         else
968           CmiAbort("Unknown checkpoint scheme");
969         recvCount = 0;
970       }
971     }
972 #endif
973   }
974
975   // don't handle array elements
976   static inline void _handleProcData(PUP::er &p, CmiBool create)
977   {
978     // save readonlys, and callback BTW
979     CkPupROData(p);
980
981     // save mainchares 
982     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
983
984 #ifndef CMK_CHARE_USE_PTR
985     // save non-migratable chare
986     CkPupChareData(p);
987 #endif
988
989     // save groups into Groups.dat
990     CkPupGroupData(p,create);
991
992     // save nodegroups into NodeGroups.dat
993     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
994   }
995
996   void CkMemCheckPT::sendProcData()
997   {
998     // find out size of buffer
999     int size;
1000     {
1001       PUP::sizer p;
1002       _handleProcData(p,CmiTrue);
1003       size = p.size();
1004     }
1005     int packSize = size;
1006     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1007     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1008     {
1009       PUP::toMem p(msg->packData);
1010       _handleProcData(p,CmiTrue);
1011     }
1012     msg->pe = CkMyPe();
1013     msg->len = size;
1014     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1015     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1016   }
1017
1018   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1019   {
1020     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1021     CpvAccess(procChkptBuf) = msg;
1022     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1023     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1024   }
1025
1026   // ArrayElement call this function to give us the checkpointed data
1027   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1028   {
1029     int len = ckTable.length();
1030     int idx;
1031     for (idx=0; idx<len; idx++) {
1032       CkCheckPTInfo *entry = ckTable[idx];
1033       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1034     }
1035     CkAssert(idx < len);
1036     int isChkpting = msg->cp_flag;
1037     ckTable[idx]->updateBuffer(msg);
1038     if (isChkpting) {
1039       // all my array elements have returned their inmem data
1040       // inform starter processor that I am done.
1041       recvCount ++;
1042       if (recvCount == ckTable.length()) {
1043         if (where == CkCheckPoint_inMEM) {
1044           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1045         }
1046         else if (where == CkCheckPoint_inDISK) {
1047           // another barrier for finalize the writing using fsync
1048           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1049           contribute(0,NULL,CkReduction::sum_int,localcb);
1050         }
1051         else
1052           CmiAbort("Unknown checkpoint scheme");
1053         recvCount = 0;
1054       } 
1055     }
1056   }
1057
1058   // only used in disk checkpointing
1059   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1060   {
1061     delete m;
1062 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1063     system("sync");
1064 #endif
1065     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1066   }
1067
1068   // only is called on cpStarter when checkpoint is done
1069   void CkMemCheckPT::cpFinish()
1070   {
1071     CmiAssert(CkMyPe() == cpStarter);
1072     peCount++;
1073     // now that all processors have finished, activate callback
1074     if (peCount == 2) 
1075     {
1076       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1077       peCount = 0;
1078       thisProxy.report();
1079     }
1080   }
1081
1082   // for debugging, report checkpoint info
1083   void CkMemCheckPT::report()
1084   {
1085     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1086     inCheckpointing = 0;
1087 #if !CMK_CHKP_ALL
1088     int objsize = 0;
1089     int len = ckTable.length();
1090     for (int i=0; i<len; i++) {
1091       CkCheckPTInfo *entry = ckTable[i];
1092       CmiAssert(entry);
1093       objsize += entry->getSize();
1094     }
1095     CmiAssert(CpvAccess(procChkptBuf));
1096     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1097 #else
1098     if(CkMyPe()==0)
1099       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1100 #endif
1101   }
1102
1103   /*****************************************************************************
1104     RESTART Procedure
1105    *****************************************************************************/
1106
1107   // master processor of two buddies
1108   inline int CkMemCheckPT::isMaster(int buddype)
1109   {
1110 #if 0
1111     int mype = CkMyPe();
1112     //CkPrintf("ismaster: %d %d\n", pe, mype);
1113     if (CkNumPes() - totalFailed() == 2) {
1114       return mype > buddype;
1115     }
1116     for (int i=1; i<CkNumPes(); i++) {
1117       int me = (buddype+i)%CkNumPes();
1118       if (isFailed(me)) continue;
1119       if (me == mype) return 1;
1120       else return 0;
1121     }
1122     return 0;
1123 #else
1124     // smaller one
1125     int mype = CkMyPe();
1126     //CkPrintf("ismaster: %d %d\n", pe, mype);
1127     if (CkNumPes() - totalFailed() == 2) {
1128       return mype < buddype;
1129     }
1130 #if NODE_CHECKPOINT
1131     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1132     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1133 #else
1134       for (int i=1; i<CkNumPes(); i++) {
1135 #endif
1136         int me = (mype+i)%CkNumPes();
1137         if (isFailed(me)) continue;
1138         if (me == buddype) return 1;
1139         else return 0;
1140       }
1141       return 0;
1142 #endif
1143     }
1144
1145
1146
1147 #if 0
1148     // helper class to pup all elements that belong to same ckLocMgr
1149     class ElementDestoryer : public CkLocIterator {
1150       private:
1151         CkLocMgr *locMgr;
1152       public:
1153         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1154         void addLocation(CkLocation &loc) {
1155           CkArrayIndex idx=loc.getIndex();
1156           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1157           loc.destroy();
1158         }
1159     };
1160 #endif
1161
1162     // restore the bitmap vector for LB
1163     void CkMemCheckPT::resetLB(int diepe)
1164     {
1165 #if CMK_LBDB_ON
1166       int i;
1167       char *bitmap = new char[CkNumPes()];
1168       // set processor available bitmap
1169       get_avail_vector(bitmap);
1170       for (i=0; i<failedPes.length(); i++)
1171         bitmap[failedPes[i]] = 0; 
1172       bitmap[diepe] = 0;
1173
1174 #if CK_NO_PROC_POOL
1175       set_avail_vector(bitmap);
1176 #endif
1177
1178       // if I am the crashed pe, rebuild my failedPEs array
1179       if (CkMyNode() == diepe)
1180         for (i=0; i<CkNumPes(); i++) 
1181           if (bitmap[i]==0) failed(i);
1182
1183       delete [] bitmap;
1184 #endif
1185     }
1186
1187     static double restartT;
1188
1189     // in case when failedPe dies, everybody go through its checkpoint table:
1190     // destory all array elements
1191     // recover lost buddies
1192     // reconstruct all array elements from check point data
1193     // called on all processors
1194     void CkMemCheckPT::restart(int diePe)
1195     {
1196 #if CMK_MEM_CHECKPOINT
1197       thisFailedPe = diePe;
1198       double curTime = CmiWallTimer();
1199       if (CkMyPe() == thisFailedPe){
1200         restartT = CmiWallTimer();
1201         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1202       }
1203       stage = (char*)"resetLB";
1204       startTime = curTime;
1205       if (CkMyPe() == thisFailedPe)
1206         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1207
1208 //#if CK_NO_PROC_POOL
1209 //      failed(diePe);  // add into the list of failed pes
1210 //#endif
1211
1212 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1213
1214       inRestarting = 1;
1215
1216       // disable load balancer's barrier
1217       //if (CkMyPe() != diePe) resetLB(diePe);
1218
1219       CKLOCMGR_LOOP(mgr->startInserting(););
1220
1221
1222       if(CmiNumPartition()==1){
1223         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1224       }else{
1225         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1226         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1227       }
1228 #endif
1229     }
1230
1231     // loally remove all array elements
1232     void CkMemCheckPT::removeArrayElements()
1233     {
1234 #if CMK_MEM_CHECKPOINT
1235       int len = ckTable.length();
1236       double curTime = CmiWallTimer();
1237       if (CkMyPe() == thisFailedPe) 
1238         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1239       stage = (char*)"removeArrayElements";
1240       startTime = curTime;
1241
1242       //  if (cpCallback.isInvalid()) 
1243       //          CkPrintf("invalid pe %d\n",CkMyPe());
1244       //          CkAbort("Didn't set restart callback\n");;
1245       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1246
1247       // get rid of all buffering and remote recs
1248       // including destorying all array elements
1249 #if CK_NO_PROC_POOL  
1250       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1251 #else
1252       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1253 #endif
1254       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1255 #endif
1256     }
1257
1258     // flush state in reduction manager
1259     void CkMemCheckPT::resetReductionMgr()
1260     {
1261       if (CkMyPe() == thisFailedPe) 
1262         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1263       stage = (char *)"resetReductionMgr";
1264       int numGroups = CkpvAccess(_groupIDTable)->size();
1265       for(int i=0;i<numGroups;i++) {
1266         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1267         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1268         obj->flushStates();
1269         obj->ckJustMigrated();
1270       }
1271
1272       //reset CmiReduce
1273       CmiResetReductions();
1274
1275       // reset again
1276       //CpvAccess(_qd)->flushStates();
1277       if(CmiNumPartition()==1){
1278         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1279       }
1280       else{
1281         if (CkMyPe() == thisFailedPe) 
1282           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1283         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1284       }
1285     }
1286
1287     // recover the lost buddies
1288     void CkMemCheckPT::recoverBuddies()
1289     {
1290       int idx;
1291       int len = ckTable.length();
1292       // ready to flush reduction manager
1293       // cannot be CkMemCheckPT::restart because destory will modify states
1294       double curTime = CmiWallTimer();
1295       if (CkMyPe() == thisFailedPe)
1296         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1297       stage = (char *)"recoverBuddies";
1298       if (CkMyPe() == thisFailedPe)
1299         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1300       startTime = curTime;
1301
1302       // recover buddies
1303       expectCount = 0;
1304 #if !CMK_CHKP_ALL
1305       for (idx=0; idx<len; idx++) {
1306         CkCheckPTInfo *entry = ckTable[idx];
1307         if (entry->pNo == thisFailedPe) {
1308 #if CK_NO_PROC_POOL
1309           // find a new buddy
1310           int budPe = BuddyPE(CkMyPe());
1311 #else
1312           int budPe = thisFailedPe;
1313 #endif
1314           CkArrayCheckPTMessage *msg = entry->getCopy();
1315           msg->bud1 = budPe;
1316           msg->bud2 = CkMyPe();
1317           msg->cp_flag = 0;            // not checkpointing
1318           thisProxy[budPe].recoverEntry(msg);
1319           expectCount ++;
1320         }
1321       }
1322 #else
1323       //send to failed pe
1324       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1325 #if CK_NO_PROC_POOL
1326         // find a new buddy
1327         int budPe = BuddyPE(CkMyPe());
1328 #else
1329         int budPe = thisFailedPe;
1330 #endif
1331         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1332         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1333         msg->cp_flag = 0;            // not checkpointing
1334         msg->bud1 = budPe;
1335         msg->bud2 = CkMyPe();
1336         thisProxy[budPe].recoverEntry(msg);
1337         expectCount ++;
1338       }
1339 #endif
1340
1341       if (expectCount == 0) {
1342         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1343       }
1344       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1345     }
1346
1347     void CkMemCheckPT::gotData()
1348     {
1349       ackCount ++;
1350       if (ackCount == expectCount) {
1351         ackCount = 0;
1352         expectCount = -1;
1353         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1354         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1355       }
1356     }
1357
1358     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1359     {
1360
1361       for (int i=0; i<n; i++) {
1362         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1363         mgr->updateLocation(idx[i], nowOnPe);
1364       }
1365       thisProxy[nowOnPe].gotReply();
1366     }
1367
1368     // restore array elements
1369     void CkMemCheckPT::recoverArrayElements()
1370     {
1371       double curTime = CmiWallTimer();
1372       int len = ckTable.length();
1373       if (CkMyPe() == thisFailedPe)
1374         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1375       stage = (char *)"recoverArrayElements";
1376       startTime = curTime;
1377       int flag = 0;
1378       // recover all array elements
1379       int count = 0;
1380
1381 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1382       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1383       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1384 #endif
1385
1386 #if !CMK_CHKP_ALL
1387       for (int idx=0; idx<len; idx++)
1388       {
1389         CkCheckPTInfo *entry = ckTable[idx];
1390 #if CK_NO_PROC_POOL
1391         // the bigger one will do 
1392         //    if (CkMyPe() < entry->pNo) continue;
1393         if (!isMaster(entry->pNo)) continue;
1394 #else
1395         // smaller one do it, which has the original object
1396         if (CkMyPe() == entry->pNo+1 || 
1397             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1398 #endif
1399         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1400
1401         entry->updateBuddy(CkMyPe(), entry->pNo);
1402         CkArrayCheckPTMessage *msg = entry->getCopy();
1403         // gzheng
1404         //thisProxy[CkMyPe()].inmem_restore(msg);
1405         inmem_restore(msg);
1406 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1407         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1408         int homePe = mgr->homePe(msg->index);
1409         if (homePe != CkMyPe()) {
1410           gmap[homePe].push_back(msg->locMgr);
1411           imap[homePe].push_back(msg->index);
1412         }
1413 #endif
1414         CkFreeMsg(msg);
1415         count ++;
1416       }
1417 #else
1418       char * packData;
1419       if(CmiNumPartition()==1){
1420         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1421         packData = (char *)msg->packData;
1422       }
1423       else{
1424         int pointer = CpvAccess(curPointer);
1425         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1426         packData = msg->packData;
1427       }
1428 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1429       recoverAll(packData,gmap,imap);
1430 #else
1431       recoverAll(packData);
1432 #endif
1433 #endif
1434 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1435       for (int i=0; i<CkNumPes(); i++) {
1436         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1437           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1438           flag++;       
1439         }
1440       }
1441       delete [] imap;
1442       delete [] gmap;
1443 #endif
1444       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1445       //if (CkMyPe() == thisFailedPe)
1446
1447       CKLOCMGR_LOOP(mgr->doneInserting(););
1448
1449       // _crashedNode = -1;
1450       CpvAccess(_crashedNode) = -1;
1451 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1452       if (CkMyPe() == 0)
1453         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1454 #else
1455       if(flag == 0)
1456       {
1457         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1458       }
1459 #endif
1460     }
1461
1462     void CkMemCheckPT::gotReply(){
1463       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1464     }
1465
1466     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1467 #if CMK_CHKP_ALL
1468       PUP::fromMem p(packData);
1469       int numElements;
1470       p|numElements;
1471       if(p.isUnpacking()){
1472         for(int i=0;i<numElements;i++){
1473           CkGroupID gID;
1474           CkArrayIndex idx;
1475           p|gID;
1476           p|idx;
1477           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1478           int homePe = mgr->homePe(idx);
1479 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1480           mgr->resume(idx,p,CmiTrue,CmiTrue);
1481 #else
1482           if(CmiNumPartition()==1)
1483             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1484           else{
1485             if(CkMyPe()==thisFailedPe){
1486               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1487             }
1488             else{
1489               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1490             }
1491           }
1492 #endif
1493 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1494           homePe = mgr->homePe(idx);
1495           if (homePe != CkMyPe()) {
1496             gmap[homePe].push_back(gID);
1497             imap[homePe].push_back(idx);
1498           }
1499 #endif
1500         }
1501       }
1502 #endif
1503     }
1504
1505
1506     // on every processor
1507     // turn load balancer back on
1508     void CkMemCheckPT::finishUp()
1509     {
1510       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1511       //CKLOCMGR_LOOP(mgr->doneInserting(););
1512
1513       if (CkMyPe() == thisFailedPe)
1514       {
1515         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1516         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1517         fflush(stdout);
1518       }
1519       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1520       inRestarting = 0;
1521       maxIter = -1;
1522 #if CMK_CONVERSE_MPI    
1523       if(CmiNumPartition()!=1){
1524         CpvAccess(recvdProcChkp) = 0;
1525         CpvAccess(recvdArrayChkp) = 0;
1526         CpvAccess(curPointer)^=1;
1527         //notify my replica, restart is done
1528         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1529           CpvAccess(remoteStarted) =0;
1530           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1531           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1532           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1533         }
1534       }
1535       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1536         lastPingTime = CmiWallTimer();
1537         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1538       }
1539 #endif
1540
1541 #if CK_NO_PROC_POOL
1542 #if NODE_CHECKPOINT
1543       int numnodes = CmiNumPhysicalNodes();
1544 #else
1545       int numnodes = CkNumPes();
1546 #endif
1547       if (numnodes-totalFailed() <=2) {
1548         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1549         _memChkptOn = 0;
1550       }
1551 #endif
1552     }
1553
1554     void CkMemCheckPT::recoverFromSoftFailure()
1555     {
1556       inRestarting = 0;
1557       maxIter = -1;
1558       CpvAccess(recvdRemote) = 0;
1559       CpvAccess(recvdLocal) = 0;
1560       CpvAccess(localChkpDone) = 0;
1561       CpvAccess(remoteChkpDone) = 0;
1562       CpvAccess(remoteReady) = 0;
1563       CpvAccess(localReady) = 0;
1564       inCheckpointing = 0;
1565       notifyReplica = 0;
1566       CpvAccess(remoteStarted) = 0;
1567       CpvAccess(localStarted) = 0;
1568       CpvAccess(_remoteCrashedNode) = -1;
1569       CkMemCheckPT::replicaAlive = 1;
1570       inCheckpointing = 0;
1571       notifyReplica = 0;
1572       if(CkMyPe() == 0){
1573         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1574       }
1575       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1576     }
1577     // called only on 0
1578     void CkMemCheckPT::quiescence(CkCallback &cb)
1579     {
1580       static int pe_count = 0;
1581       pe_count ++;
1582       CmiAssert(CkMyPe() == 0);
1583       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1584       if (pe_count == CkNumPes()) {
1585         pe_count = 0;
1586         cb.send();
1587       }
1588     }
1589
1590     // User callable function - to start a checkpoint
1591     // callback cb is used to pass control back
1592     void CkStartMemCheckpoint(CkCallback &cb)
1593     {
1594 #if CMK_MEM_CHECKPOINT
1595       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1596       /*if (_memChkptOn == 0) {
1597         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1598         cb.send();
1599         return;
1600         }*/
1601       if (CkInRestarting()) {
1602         // trying to checkpointing during restart
1603         cb.send();
1604         return;
1605       }
1606       if(CkInCheckpointing())
1607         return;
1608       // store user callback and user data
1609       CkMemCheckPT::cpCallback = cb;
1610
1611
1612       //send to my replica that checkpoint begins 
1613       if(CkReplicaAlive()==1){
1614         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1615         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1616         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1617       }
1618       CpvAccess(localStarted) = 1;
1619       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1620       checkptMgr.chkpLocalStart();
1621       // broadcast to start check pointing
1622       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1623         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1624         checkptMgr.doItNow(CkMyPe());
1625       }
1626 #else
1627       // when mem checkpoint is disabled, invike cb immediately
1628       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1629       cb.send();
1630 #endif
1631     }
1632
1633     void CkRestartCheckPoint(int diePe)
1634     {
1635       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1636       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1637       // broadcast
1638       checkptMgr.restart(diePe);
1639     }
1640
1641     static int _diePE = -1;
1642
1643     // callback function used locally by ccs handler
1644     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1645     {
1646       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1647       CkRestartCheckPoint(_diePE);
1648     }
1649
1650
1651     // called on crashed PE
1652     static void restartBeginHandler(char *msg)
1653     {
1654       CmiFree(msg);
1655 #if CMK_MEM_CHECKPOINT
1656 #if CMK_USE_BARRIER
1657       if(CkMyPe()!=_diePE){
1658         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1659         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1660         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1661         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1662       }else{
1663         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1664         CkRestartCheckPointCallback(NULL, NULL);
1665       }
1666 #else
1667       static int count = 0;
1668       CmiAssert(CkMyPe() == _diePE);
1669       count ++;
1670       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1671         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1672         CkRestartCheckPointCallback(NULL, NULL);
1673         count = 0;
1674       }
1675 #endif
1676 #endif
1677     }
1678
1679     extern void _discard_charm_message();
1680     extern void _resume_charm_message();
1681
1682     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1683       return data;
1684     }
1685
1686     static void restartBcastHandler(char *msg)
1687     {
1688 #if CMK_MEM_CHECKPOINT
1689       // advance phase counter
1690       CkMemCheckPT::inRestarting = 1;
1691       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1692       // gzheng
1693       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1694
1695       if (CkMyPe()==_diePE)
1696         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1697
1698       // reset QD counters
1699       /*  gzheng
1700           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1701        */
1702
1703       /*  gzheng
1704           if (CkMyPe()==_diePE)
1705           CkRestartCheckPointCallback(NULL, NULL);
1706        */
1707       CmiFree(msg);
1708
1709       _resume_charm_message();
1710
1711       // reduction
1712       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1713       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1714 #if CMK_USE_BARRIER
1715       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1716 #else
1717       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1718 #endif 
1719       checkpointed = 0;
1720 #endif
1721     }
1722
1723     extern void _initDone();
1724
1725     bool compare(char * buf1, char *buf2){
1726       if(CkMyPe()==0)
1727         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1728       PUP::checker pchecker(buf1,buf2);
1729       pchecker.skip();
1730       int numElements;
1731       pchecker|numElements;
1732       for(int i=0;i<numElements;i++){
1733         CkGroupID gID;
1734         CkArrayIndex idx;
1735
1736         pchecker|gID;
1737         pchecker|idx;
1738
1739         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1740         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1741       }
1742       if(CkMyPe()==0)
1743         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1744       //int fault_num = pchecker.getFaultNum();
1745       bool result = pchecker.getResult();
1746       /*if(!result){
1747         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1748       }*/
1749       return result;
1750     }
1751     int getChecksum(char * buf){
1752       PUP::checker pchecker(buf);
1753       pchecker.skip();
1754       int numElements;
1755       pchecker|numElements;
1756 //      CkPrintf("num %d\n",numElements);
1757       for(int i=0;i<numElements;i++){
1758         CkGroupID gID;
1759         CkArrayIndex idx;
1760
1761         pchecker|gID;
1762         pchecker|idx;
1763
1764         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1765         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1766       }
1767       return pchecker.getChecksum();
1768     }
1769
1770     static void recvRemoteChkpHandler(char *msg){
1771       CpvAccess(remoteChkpDone) = 1;
1772       if(CpvAccess(use_checksum)){
1773         if(CkMyPe()==0)
1774           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1775         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1776         CpvAccess(recvdRemote) = 1;
1777         if(CpvAccess(recvdLocal)==1){
1778           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1779             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1780           }
1781           else{
1782             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1783           }
1784         }
1785       }else{
1786         envelope *env = (envelope *)msg;
1787         CkUnpackMessage(&env);
1788         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1789         if(CpvAccess(recvdLocal)==1){
1790           int pointer = CpvAccess(curPointer);
1791           int size = CpvAccess(chkpBuf)[pointer]->len;
1792           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1793             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1794           }else
1795           {
1796             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1797           }
1798           delete chkpMsg;
1799           if(CkMyPe()==0)
1800             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1801         }else{
1802           CpvAccess(recvdRemote) = 1;
1803           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1804           CpvAccess(buddyBuf) = chkpMsg;
1805         }
1806       }
1807     }
1808
1809     static void replicaRecoverHandler(char *msg){
1810       //fflush(stdout);
1811       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1812       CpvAccess(remoteChkpDone) = 1;
1813       if(CpvAccess(localChkpDone) == 1)
1814         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1815       else{  
1816         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1817         //CmiAbort("impossible state");
1818       }
1819       CmiFree(msg);
1820     }
1821
1822     static void replicaChkpDoneHandler(char *msg){
1823       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1824       CpvAccess(remoteChkpDone) = 1;
1825       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1826       if(CpvAccess(localChkpDone) == 1)
1827         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1828       else  
1829         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1830       CmiFree(msg);
1831     }
1832
1833     static void replicaDieHandler(char * msg){
1834 #if CMK_CONVERSE_MPI
1835       //broadcast to every one in my replica
1836       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1837       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1838 #endif
1839     }
1840
1841     static void replicaChkpStartHandler(char * msg){
1842       if(CkMyPe()==0){
1843         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1844       }
1845       CpvAccess(remoteStarted) =1;
1846       if(CpvAccess(localStarted)==1){    
1847         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1848         checkptMgr.doItNow(0);
1849       }
1850     }
1851
1852     static void replicaDieBcastHandler(char *msg){
1853       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1854       CpvAccess(_remoteCrashedNode) = diePe;
1855       if(CkMyPe()==diePe){
1856         CmiPrintf("pe %d in replica world die\n",diePe);
1857         fflush(stdout);
1858       }
1859       
1860       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1861         find_spare_mpirank(diePe,CmiMyPartition()^1);
1862       }
1863
1864       //what if am already ready to checkpoint
1865       if(CpvAccess(resilience)!=1){
1866         CkMemCheckPT::replicaAlive = 0;
1867         if(CpvAccess(localStarted)==1){    
1868           if(CkMyPe()==0){
1869             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1870             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1871             checkptMgr.doItNow(0);
1872           }
1873         }
1874         //broadcast to my partition to get local max iter
1875         else{
1876           if(CkMyPe()==diePe){
1877             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
1878           }
1879           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1880         }
1881       }
1882
1883       CmiFree(msg);
1884     }
1885
1886     static void recoverRemoteProcDataHandler(char *msg){
1887       envelope *env = (envelope *)msg;
1888       CkUnpackMessage(&env);
1889       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1890
1891       //store the checkpoint
1892       int pointer = procMsg->pointer;
1893
1894
1895       if(CkMyPe()==CpvAccess(_crashedNode)){
1896         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1897         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1898         PUP::fromMem p(procMsg->packData);
1899         _handleProcData(p,CmiTrue);
1900         _initDone();
1901         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
1902       }
1903       else{
1904         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1905         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1906         //_handleProcData(p,CmiFalse);
1907       }
1908       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1909       CKLOCMGR_LOOP(mgr->startInserting(););
1910
1911       CpvAccess(recvdProcChkp) =1;
1912       if(CpvAccess(recvdArrayChkp)==1){
1913         _resume_charm_message();
1914         _diePE = CpvAccess(_crashedNode);
1915         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1916         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1917         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1918 #if CMK_USE_BARRIER
1919         if(CpvAccess(resilience)==1){
1920           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1921         }else
1922           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1923 #else
1924         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1925 #endif 
1926       }
1927     }
1928
1929     static void recoverRemoteArrayDataHandler(char *msg){
1930       envelope *env = (envelope *)msg;
1931       CkUnpackMessage(&env);
1932       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1933
1934       //store the checkpoint
1935       int pointer = chkpMsg->pointer;
1936       CpvAccess(curPointer) = pointer;
1937       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1938       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1939       CpvAccess(recvdArrayChkp) =1;
1940       CkMemCheckPT::inRestarting = 1;
1941
1942       if(CkMyPe()==CpvAccess(_crashedNode))
1943         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
1944       
1945       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1946         _resume_charm_message();
1947         _diePE = CpvAccess(_crashedNode);
1948         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1949         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1950         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1951         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1952 #if CMK_USE_BARRIER
1953         if(CpvAccess(resilience)==1){
1954           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1955         }else
1956           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1957 #else
1958         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1959 #endif 
1960       }
1961     }
1962
1963     static void recvPhaseHandler(char * msg)
1964     {
1965       //decrease the phase number so that the crashed replica can communicate with the healthy one
1966       CpvAccess(_curRestartPhase)--;
1967       
1968       CkMemCheckPT::inRestarting = 1;
1969       CmiFree(msg);
1970       //notify the buddy in the replica now i can receive the checkpoint msg
1971       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1972         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1973         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1974         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1975         //timer start
1976         if(CpvAccess(_crashedNode)==CkMyPe()){
1977           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1978           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
1979         }
1980       }else{
1981         CpvAccess(curPointer)^=1;
1982         CkMemCheckPT::inRestarting = 1;
1983         _resume_charm_message();
1984         _diePE = CpvAccess(_crashedNode);
1985       }
1986     }
1987     
1988     static void askRecoverDataHandler(char * msg){
1989       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1990         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1991       if(CpvAccess(resilience)!=1){
1992         CpvAccess(remoteReady)=1;
1993         if(CpvAccess(localReady)==1){
1994           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1995           {     
1996             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1997             CkPackMessage(&env);
1998             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1999             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2000             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2001           }
2002           //send the array checkpoint data
2003           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2004           CkPackMessage(&env);
2005           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2006           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2007           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2008             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2009         }
2010       }else{
2011         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2012         int pointer = CpvAccess(curPointer)^1;
2013         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2014         procMsg->pointer = pointer;
2015         envelope * env = (envelope *)(UsrToEnv(procMsg));
2016         CkPackMessage(&env);
2017         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2018         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2019         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2020         
2021         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2022         arrayMsg->pointer = pointer;
2023         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2024         CkPackMessage(&env1);
2025         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2026         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2027         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2028       }
2029     }
2030     // called on crashed processor
2031     static void recoverProcDataHandler(char *msg)
2032     {
2033 #if CMK_MEM_CHECKPOINT
2034       int i;
2035       envelope *env = (envelope *)msg;
2036       CkUnpackMessage(&env);
2037       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2038       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2039       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2040       PUP::fromMem p(procMsg->packData);
2041       _handleProcData(p,CmiTrue);
2042
2043       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2044       // gzheng
2045       CKLOCMGR_LOOP(mgr->startInserting(););
2046
2047       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2048       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2049       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2050       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2051
2052       _initDone();
2053       //   CpvAccess(_qd)->flushStates();
2054       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2055 #endif
2056     }
2057     //for replica, got the phase number from my neighbor processor in the current partition
2058     static void askPhaseHandler(char *msg)
2059     {
2060 #if CMK_MEM_CHECKPOINT
2061       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2062       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2063       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2064       CmiSetHandler(msg, recvPhaseHandlerIdx);
2065       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2066 #endif
2067     }
2068     // called on its backup processor
2069     // get backup message buffer and sent to crashed processor
2070     static void askProcDataHandler(char *msg)
2071     {
2072 #if CMK_MEM_CHECKPOINT
2073       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2074       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2075       if (CpvAccess(procChkptBuf) == NULL)  {
2076         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2077         CkAbort("no checkpoint found");
2078       }
2079       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2080       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2081
2082       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2083
2084       CkPackMessage(&env);
2085       CmiSetHandler(env, recoverProcDataHandlerIdx);
2086       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2087       CpvAccess(procChkptBuf) = NULL;
2088       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2089 #endif
2090     }
2091
2092     // called on PE 0
2093     void qd_callback(void *m)
2094     {
2095       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2096       fflush(stdout);
2097       CkFreeMsg(m);
2098       if(CmiNumPartition()==1){
2099 #ifdef CMK_SMP
2100         for(int i=0;i<CmiMyNodeSize();i++){
2101           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2102           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2103           CmiSetHandler(msg, askProcDataHandlerIdx);
2104           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2105           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2106         }
2107         return;
2108 #endif
2109         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2110         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2111         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2112         CmiSetHandler(msg, askProcDataHandlerIdx);
2113         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2114         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2115       }
2116       else{
2117         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2118         CmiSetHandler(msg, recvPhaseHandlerIdx);
2119         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2120       }
2121     }
2122
2123     // on crashed node
2124     void CkMemRestart(const char *dummy, CkArgMsg *args)
2125     {
2126 #if CMK_MEM_CHECKPOINT
2127       _diePE = CmiMyNode();
2128       CpvAccess( _crashedNode )= CmiMyNode();
2129       CkMemCheckPT::inRestarting = 1;
2130       _discard_charm_message();
2131
2132       /*if(CmiMyRank()==0){
2133         CkCallback cb(qd_callback);
2134         CkStartQD(cb);
2135         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2136         }*/
2137       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2138       if(CmiNumPartition()==1){
2139         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2140         restartT = CmiWallTimer();
2141         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2142         fflush(stdout);
2143         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2144         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2145         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2146         CmiSetHandler(msg, askProcDataHandlerIdx);
2147         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2148         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2149       }
2150       else{
2151         CkCallback cb(qd_callback);
2152         CkStartQD(cb);
2153       }
2154 #else
2155       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2156 #endif
2157     }
2158
2159     // can be called in other files
2160     // return true if it is in restarting
2161     extern "C"
2162       int CkInRestarting()
2163       {
2164 #if CMK_MEM_CHECKPOINT
2165         if (CpvAccess( _crashedNode)!=-1) return 1;
2166         // gzheng
2167         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2168         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2169         return CkMemCheckPT::inRestarting;
2170 #else
2171         return 0;
2172 #endif
2173       }
2174
2175     extern "C"
2176       int CkReplicaAlive()
2177       {
2178         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2179         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2180         return CkMemCheckPT::replicaAlive;
2181
2182         /*if(CkMemCheckPT::replicaDead==1)
2183           return 0;
2184           else          
2185           return 1;*/
2186       }
2187
2188     extern "C"
2189       int CkInCheckpointing()
2190       {
2191         return CkMemCheckPT::inCheckpointing;
2192       }
2193
2194     extern "C"
2195       void CkSetInLdb(){
2196 #if CMK_MEM_CHECKPOINT
2197         CkMemCheckPT::inLoadbalancing = 1;
2198 #endif
2199       }
2200
2201     extern "C"
2202       int CkInLdb(){
2203 #if CMK_MEM_CHECKPOINT
2204         return CkMemCheckPT::inLoadbalancing;
2205 #endif
2206         return 0;
2207       }
2208
2209     extern "C"
2210       void CkResetInLdb(){
2211 #if CMK_MEM_CHECKPOINT
2212         CkMemCheckPT::inLoadbalancing = 0;
2213 #endif
2214       }
2215
2216     /*****************************************************************************
2217       module initialization
2218      *****************************************************************************/
2219
2220     static int arg_where = CkCheckPoint_inMEM;
2221
2222 #if CMK_MEM_CHECKPOINT
2223     void init_memcheckpt(char **argv)
2224     {
2225       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2226         arg_where = CkCheckPoint_inDISK;
2227       }
2228       CpvInitialize(int, use_checksum);
2229       CpvInitialize(int, resilience);
2230       CpvAccess(use_checksum)=0;
2231       CpvAccess(resilience)=0;//TODO should set a default resilience level
2232       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2233         CpvAccess(use_checksum)=1;
2234       }
2235       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2236         CpvAccess(resilience)=1;
2237       }
2238       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2239         CpvAccess(resilience)=2;
2240       }
2241       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2242         CpvAccess(resilience)=3;
2243       }
2244       // initiliazing _crashedNode variable
2245       CpvInitialize(int, _crashedNode);
2246       CpvInitialize(int, _remoteCrashedNode);
2247       CpvAccess(_crashedNode) = -1;
2248       CpvAccess(_remoteCrashedNode) = -1;
2249       init_FI(argv);
2250     }
2251 #endif
2252
2253     class CkMemCheckPTInit: public Chare {
2254       public:
2255         CkMemCheckPTInit(CkArgMsg *m) {
2256 #if CMK_MEM_CHECKPOINT
2257           if (arg_where == CkCheckPoint_inDISK) {
2258             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2259           }
2260           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2261           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2262 #endif
2263         }
2264     };
2265
2266     static void notifyHandler(char *msg)
2267     {
2268 #if CMK_MEM_CHECKPOINT
2269       CmiFree(msg);
2270       /* immediately increase restart phase to filter old messages */
2271       CpvAccess(_curRestartPhase) ++;
2272       CpvAccess(_qd)->flushStates();
2273       _discard_charm_message();
2274
2275 #endif
2276     }
2277
2278     extern "C"
2279       void notify_crash(int node)
2280       {
2281 #ifdef CMK_MEM_CHECKPOINT
2282         CpvAccess( _crashedNode) = node;
2283 #ifdef CMK_SMP
2284         for(int i=0;i<CkMyNodeSize();i++){
2285           CpvAccessOther(_crashedNode,i)=node;
2286         }
2287 #endif
2288         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2289         CkMemCheckPT::inRestarting = 1;
2290
2291         // this may be in interrupt handler, send a message to reset QD
2292         int pe = CmiNodeFirst(CkMyNode());
2293         for(int i=0;i<CkMyNodeSize();i++){
2294           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2295           CmiSetHandler(msg, notifyHandlerIdx);
2296           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2297         }
2298 #endif
2299       }
2300
2301     extern "C" void (*notify_crash_fn)(int node);
2302
2303
2304 #if CMK_CONVERSE_MPI
2305     void buddyDieHandler(char *msg)
2306     {
2307 #if CMK_MEM_CHECKPOINT
2308       // notify
2309       CkMemCheckPT::inRestarting = 1;
2310       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2311       notify_crash(diepe);
2312       // send message to crash pe to let it restart
2313       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2314       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2315       int buddy = obj->BuddyPE(CmiMyPe());
2316       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2317       if (buddy == diepe)  {
2318         mpi_restart_crashed(diepe, newrank);
2319       }
2320 #endif
2321     }
2322
2323     void pingHandler(void *msg)
2324     {
2325       lastPingTime = CmiWallTimer();
2326       CmiFree(msg);
2327     }
2328
2329     void pingCheckHandler()
2330     {
2331 #if CMK_MEM_CHECKPOINT
2332       double now = CmiWallTimer();
2333       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2334         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2335         int i, pe, buddy;
2336         // tell everyone the buddy dies
2337         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2338         for (i = 1; i < CmiNumPes(); i++) {
2339           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2340           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2341         }
2342         buddy = pe;
2343         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2344         fflush(stdout);
2345         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2346         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2347         CmiSetHandler(msg, buddyDieHandlerIdx);
2348         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2349         //send to everyone in the other world
2350         if(CmiNumPartition()!=1){
2351           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2352           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2353           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2354           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2355         }
2356       }
2357         else 
2358           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2359 #endif
2360       }
2361
2362       void pingBuddy()
2363       {
2364 #if CMK_MEM_CHECKPOINT
2365         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2366         if (obj) {
2367           int buddy = obj->BuddyPE(CkMyPe());
2368           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2369           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2370           CmiSetHandler(msg, pingHandlerIdx);
2371           CmiGetRestartPhase(msg) = 9999;
2372           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2373         }
2374         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2375 #endif
2376       }
2377 #endif
2378
2379       // initproc
2380       void CkRegisterRestartHandler( )
2381       {
2382 #if CMK_MEM_CHECKPOINT
2383         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2384         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2385         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2386         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2387         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2388         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2389
2390         //for replica
2391         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2392         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2393         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2394         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2395         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2396         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2397         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2398         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2399         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2400         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2401
2402 #if CMK_CONVERSE_MPI
2403         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2404         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2405         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2406 #endif
2407
2408         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2409         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2410         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2411         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2412         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2413         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2414         CpvInitialize(int,curPointer);
2415         CpvInitialize(int,recvdLocal);
2416         CpvInitialize(int,localChkpDone);
2417         CpvInitialize(int,remoteChkpDone);
2418         CpvInitialize(int,remoteStarted);
2419         CpvInitialize(int,localStarted);
2420         CpvInitialize(int,localReady);
2421         CpvInitialize(int,remoteReady);
2422         CpvInitialize(int,recvdRemote);
2423         CpvInitialize(int,recvdProcChkp);
2424         CpvInitialize(int,localChecksum);
2425         CpvInitialize(int,remoteChecksum);
2426         CpvInitialize(int,recvdArrayChkp);
2427
2428         CpvAccess(procChkptBuf) = NULL;
2429         CpvAccess(buddyBuf) = NULL;
2430         CpvAccess(recoverProcBuf) = NULL;
2431         CpvAccess(recoverArrayBuf) = NULL;
2432         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2433         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2434         CpvAccess(chkpBuf)[0] = NULL;
2435         CpvAccess(chkpBuf)[1] = NULL;
2436         CpvAccess(localProcChkpBuf)[0] = NULL;
2437         CpvAccess(localProcChkpBuf)[1] = NULL;
2438
2439         CpvAccess(curPointer) = 0;
2440         CpvAccess(recvdLocal) = 0;
2441         CpvAccess(localChkpDone) = 0;
2442         CpvAccess(remoteChkpDone) = 0;
2443         CpvAccess(remoteStarted) = 0;
2444         CpvAccess(localStarted) = 0;
2445         CpvAccess(localReady) = 0;
2446         CpvAccess(remoteReady) = 0;
2447         CpvAccess(recvdRemote) = 0;
2448         CpvAccess(recvdProcChkp) = 0;
2449         CpvAccess(localChecksum) = 0;
2450         CpvAccess(remoteChecksum) = 0;
2451         CpvAccess(recvdArrayChkp) = 0;
2452
2453         notify_crash_fn = notify_crash;
2454
2455 #if ! CMK_CONVERSE_MPI
2456         // print pid to kill
2457         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2458         //  sleep(4);
2459 #endif
2460 #endif
2461       }
2462
2463
2464       extern "C"
2465         int CkHasCheckpoints()
2466         {
2467           return checkpointed;
2468         }
2469
2470       /// @todo: the following definitions should be moved to a separate file containing
2471       // structures and functions about fault tolerance strategies
2472
2473       /**
2474        *  * @brief: function for killing a process                                             
2475        *   */
2476 #ifdef CMK_MEM_CHECKPOINT
2477 #if CMK_HAS_GETPID
2478       void killLocal(void *_dummy,double curWallTime){
2479         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2480         if(CmiWallTimer()<killTime-1){
2481           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2482         }else{ 
2483 #if CMK_CONVERSE_MPI
2484           CkDieNow();
2485 #else 
2486           kill(getpid(),SIGKILL);                                               
2487 #endif
2488         }              
2489       } 
2490 #else
2491       void killLocal(void *_dummy,double curWallTime){
2492         CmiAbort("kill() not supported!");
2493       }
2494 #endif
2495 #endif
2496
2497 #ifdef CMK_MEM_CHECKPOINT
2498       /**
2499        * @brief: reads the file with the kill information
2500        */
2501       void readKillFile(){
2502         FILE *fp=fopen(killFile,"r");
2503         if(!fp){
2504           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2505           return;
2506         }
2507         int proc;
2508         double sec;
2509         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2510           if(proc == CkMyNode() && CkMyRank() == 0){
2511             killTime = CmiWallTimer()+sec;
2512             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2513             CcdCallFnAfter(killLocal,NULL,sec*1000);
2514           }
2515         }
2516         fclose(fp);
2517       }
2518
2519 #if ! CMK_CONVERSE_MPI
2520       void CkDieNow()
2521       {
2522 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2523         // ignored for non-mpi version
2524         CmiPrintf("[%d] die now.\n", CmiMyPe());
2525         killTime = CmiWallTimer()+0.001;
2526         CcdCallFnAfter(killLocal,NULL,1);
2527 #endif
2528       }
2529 #endif
2530
2531 #endif
2532
2533 #include "CkMemCheckpoint.def.h"
2534
2535