b3aef384dadd3aa5d359f90f18fa4e355a9a98b5
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92 unsigned int failureSeed;
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // flag for failure distributiona
104 char * failureDist;
105 // Meantimr between failure
106 int MTBF;
107 // variable for storing the killing time
108 double killTime=0.0;
109 extern void killLocal(void *_dummy,double curWallTime);
110 #endif
111
112 #ifdef CKLOCMGR_LOOP
113 #undef CKLOCMGR_LOOP
114 #endif
115 // loop over all CkLocMgr and do "code"
116 #define  CKLOCMGR_LOOP(code)    {       \
117   int numGroups = CkpvAccess(_groupIDTable)->size();    \
118   for(int i=0;i<numGroups;i++) {        \
119     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
120     if(obj->isLocMgr())  {      \
121       CkLocMgr *mgr = (CkLocMgr*)obj;   \
122       code      \
123     }   \
124   }     \
125 }
126
127 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
128 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
129 CpvDeclare(CkCheckPTMessage**, chkpBuf);
130 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
131 //store the checkpoint of the buddy to compare
132 //do not need the whole msg, can be the checksum
133 CpvDeclare(CkCheckPTMessage*, buddyBuf);
134 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
135 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
136 //pointer of the checkpoint going to be written
137 CpvDeclare(int, curPointer);
138 CpvDeclare(int, recvdRemote);
139 CpvDeclare(int, recvdLocal);
140 CpvDeclare(int, localChkpDone);
141 CpvDeclare(int, remoteChkpDone);
142 CpvDeclare(int, remoteStarted);
143 CpvDeclare(int, localStarted);
144 CpvDeclare(int, remoteReady);
145 CpvDeclare(int, localReady);
146 CpvDeclare(int, recvdArrayChkp);
147 CpvDeclare(int, recvdProcChkp);
148 CpvDeclare(int, localChecksum);
149 CpvDeclare(int, remoteChecksum);
150
151 bool compare(char * buf1, char * buf2);
152 int getChecksum(char * buf);
153 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
154 // Converse function handles
155 static int askPhaseHandlerIdx;
156 static int recvPhaseHandlerIdx;
157 static int askProcDataHandlerIdx;
158 static int askRecoverDataHandlerIdx;
159 static int restartBcastHandlerIdx;
160 static int recoverProcDataHandlerIdx;
161 static int restartBeginHandlerIdx;
162 static int recvRemoteChkpHandlerIdx;
163 static int replicaDieHandlerIdx;
164 static int replicaChkpStartHandlerIdx;
165 static int replicaDieBcastHandlerIdx;
166 static int replicaRecoverHandlerIdx;
167 static int replicaChkpDoneHandlerIdx;
168 static int replicaBeginFailureInjectionHandlerIdx;
169 static int recoverRemoteProcDataHandlerIdx;
170 static int recoverRemoteArrayDataHandlerIdx;
171 static int notifyHandlerIdx;
172 // compute the backup processor
173 // FIXME: avoid crashed processors
174 #if CMK_CONVERSE_MPI
175 static int pingHandlerIdx;
176 static int pingCheckHandlerIdx;
177 static int buddyDieHandlerIdx;
178 static double lastPingTime = -1;
179
180 extern "C" void mpi_restart_crashed(int pe, int rank);
181 extern "C" int  find_spare_mpirank(int pe,int partition);
182
183 void pingBuddy();
184 void pingCheckHandler();
185
186 #endif
187 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
188
189 inline int CkMemCheckPT::BuddyPE(int pe)
190 {
191   int budpe;
192 #if NODE_CHECKPOINT
193   // buddy is the processor with same rank on the next physical node
194   int r1 = CmiPhysicalRank(pe);
195   int budnode = CmiPhysicalNodeID(pe);
196   do {
197     budnode = (budnode+1)%CmiNumPhysicalNodes();
198     int *pelist;
199     int num;
200     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
201     budpe = pelist[r1 % num];
202   } while (isFailed(budpe));
203   if (budpe == pe) {
204     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
205     CmiAbort("Failed to find a buddy processor");
206   }
207 #else
208   budpe = pe;
209   budpe = (budpe+1)%CkNumPes();
210 #endif
211   return budpe;
212 }
213
214 // called in array element constructor
215 // choose and register with 2 buddies for checkpoiting 
216 #if CMK_MEM_CHECKPOINT
217 void ArrayElement::init_checkpt() {
218   if (_memChkptOn == 0) return;
219   if (CkInRestarting()) {
220     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
221   }
222   // only master init checkpoint
223   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
224
225   budPEs[0] = CkMyPe();
226   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
227   CmiAssert(budPEs[0] != budPEs[1]);
228   // inform checkPTMgr
229   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
230   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
231   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
232 }
233 #endif
234
235 // entry function invoked by checkpoint mgr asking for checkpoint data
236 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
237 #if CMK_MEM_CHECKPOINT
238   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
239   //char index[128];   thisIndexMax.sprint(index);
240   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
241   CkLocMgr *locMgr = thisArray->getLocMgr();
242   CmiAssert(myRec!=NULL);
243   int size;
244   {
245     PUP::sizer p;
246     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
247     size = p.size();
248   }
249   int packSize = size/sizeof(double) +1;
250   CkArrayCheckPTMessage *msg =
251     new (packSize, 0) CkArrayCheckPTMessage;
252   msg->len = size;
253   msg->index =thisIndexMax;
254   msg->aid = thisArrayID;
255   msg->locMgr = locMgr->getGroupID();
256   msg->cp_flag = 1;
257   {
258     PUP::toMem p(msg->packData);
259     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
260   }
261
262   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
263   checkptMgr.recvData(msg, 2, budPEs);
264   delete m;
265 #endif
266 }
267
268 // checkpoint holder class - for memory checkpointing
269 class CkMemCheckPTInfo: public CkCheckPTInfo
270 {
271   CkArrayCheckPTMessage *ckBuffer;
272   public:
273   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
274     CkCheckPTInfo(a, loc, idx, pno)
275   {
276     ckBuffer = NULL;
277   }
278   ~CkMemCheckPTInfo() 
279   {
280     if (ckBuffer) delete ckBuffer; 
281   }
282   inline void updateBuffer(CkArrayCheckPTMessage *data) 
283   {
284     CmiAssert(data!=NULL);
285     if (ckBuffer) delete ckBuffer;
286     ckBuffer = data;
287   }    
288   inline CkArrayCheckPTMessage * getCopy()
289   {
290     if (ckBuffer == NULL) {
291       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
292       CmiAbort("Abort!");
293     }
294     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
295   }     
296   inline void updateBuddy(int b1, int b2) {
297     CmiAssert(ckBuffer);
298     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
299     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
300     CmiAssert(pNo != CkMyPe());
301   }
302   inline int getSize() { 
303     CmiAssert(ckBuffer);
304     return ckBuffer->len; 
305   }
306 };
307
308 // checkpoint holder class - for in-disk checkpointing
309 class CkDiskCheckPTInfo: public CkCheckPTInfo 
310 {
311   char *fname;
312   int bud1, bud2;
313   int len;                      // checkpoint size
314   public:
315   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
316   {
317 #if CMK_USE_MKSTEMP
318     fname = new char[64];
319     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
320     mkstemp(fname);
321 #else
322     fname=tmpnam(NULL);
323 #endif
324     bud1 = bud2 = -1;
325     len = 0;
326   }
327   ~CkDiskCheckPTInfo() 
328   {
329     remove(fname);
330   }
331   inline void updateBuffer(CkArrayCheckPTMessage *data) 
332   {
333     double t = CmiWallTimer();
334     // unpack it
335     envelope *env = UsrToEnv(data);
336     CkUnpackMessage(&env);
337     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
338     FILE *f = fopen(fname,"wb");
339     PUP::toDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     // delay sync to the end because otherwise the messages are blocked
342     //    fsync(fileno(f));
343     fclose(f);
344     bud1 = data->bud1;
345     bud2 = data->bud2;
346     len = data->len;
347     delete data;
348     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
349   }
350   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
351   {
352     CkArrayCheckPTMessage *data;
353     FILE *f = fopen(fname,"rb");
354     PUP::fromDisk p(f);
355     CkPupMessage(p, (void **)&data);
356     fclose(f);
357     data->bud1 = bud1;                          // update the buddies
358     data->bud2 = bud2;
359     return data;
360   }
361   inline void updateBuddy(int b1, int b2) {
362     bud1 = b1; bud2 = b2;
363     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
364     CmiAssert(pNo != CkMyPe());
365   }
366   inline int getSize() { 
367     return len; 
368   }
369 };
370
371 CkMemCheckPT::CkMemCheckPT(int w)
372 {
373   int numnodes = 0;
374 #if NODE_CHECKPOINT
375   numnodes = CmiNumPhysicalNodes();
376 #else
377   numnodes = CkNumPes();
378 #endif
379 #if CK_NO_PROC_POOL
380   if (numnodes <= 2)
381 #else
382     if (numnodes  == 1)
383 #endif
384     {
385       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
386       _memChkptOn = 0;
387     }
388   inRestarting = 0;
389   inCheckpointing = 0;
390   recvCount = peCount = 0;
391   ackCount = 0;
392   expectCount = -1;
393   where = w;
394   replicaAlive = 1;
395   notifyReplica = 0;
396 #if CMK_CONVERSE_MPI
397   void pingBuddy();
398   void pingCheckHandler();
399   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
400   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
401 #endif
402   chkpTable[0] = NULL;
403   chkpTable[1] = NULL;
404   maxIter = -1;
405   recvIterCount = 0;
406   localDecided = false;
407   if(killFlag == 2){
408     localSeed = failureSeed;
409     thisProxy[CkMyPe()].generateFailure();
410   }
411 }
412
413 void CkMemCheckPT::replicaInjectFailure(){
414   char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
415   CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
416   CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
417 }
418
419 void CkMemCheckPT::generateFailure(){
420   if(strcmp(failureDist,"E")==0){
421     int rand1 = rand_r(&localSeed);
422     int rand2 = rand_r(&localSeed);
423     int rand3 = rand_r(&localSeed);
424     CkPrintf("randome %d %d %d\n", rand1, rand2, rand3);
425     int next_pe = (rand1)%CkNumPes();
426     int next_partition = (rand2)%2;
427     double sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
428     if(next_pe == CmiMyPe()&& next_partition == CmiMyPartition()){
429       killTime = CmiWallTimer()+sec;
430       printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
431       CcdCallFnAfter(killLocal,NULL,sec*1000);
432     }
433   }
434 }
435
436 CkMemCheckPT::~CkMemCheckPT()
437 {
438   int len = ckTable.length();
439   for (int i=0; i<len; i++) {
440     delete ckTable[i];
441   }
442 }
443
444 void CkMemCheckPT::pup(PUP::er& p) 
445
446   CBase_CkMemCheckPT::pup(p); 
447   p|cpStarter;
448   p|thisFailedPe;
449   p|failedPes;
450   p|ckCheckPTGroupID;           // recover global variable
451   p|cpCallback;                 // store callback
452   p|where;                      // where to checkpoint
453   p|peCount;
454   p|localSeed;
455   if (p.isUnpacking()) {
456     recvCount = 0;
457 #if CMK_CONVERSE_MPI
458     void pingBuddy();
459     void pingCheckHandler();
460     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
461     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
462 #endif
463     maxIter = -1;
464     recvIterCount = 0;
465     localDecided = false;
466   }
467 }
468
469 void CkMemCheckPT::getIter(){
470   localDecided = true;
471   localMaxIter = maxIter+1;
472   if(CkMyPe()==0){
473     CkPrintf("local max iter is %d\n",localMaxIter);
474   }
475   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
476   int elemCount = CkCountChkpSyncElements();
477   if(CkMyPe()==0)
478     startTime = CmiWallTimer();
479   if(elemCount == 0){
480     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
481   }
482 }
483
484 //when one replica failes, decide the next chkp iter
485
486 void CkMemCheckPT::recvIter(int iter){
487   if(maxIter<iter){
488     maxIter=iter;
489   }
490 }
491
492 void CkMemCheckPT::recvMaxIter(int iter){
493   localDecided = false;
494   if(CkMyPe()==0)
495     CkPrintf("checkpoint iteration is %d\n",iter);
496   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
497 }
498
499 void CkMemCheckPT::reachChkpIter(){
500   recvIterCount++;
501   elemCount = CkCountChkpSyncElements();
502   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
503   if(recvIterCount == elemCount){
504     recvIterCount = 0;
505     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
506   }
507 }
508
509 void CkMemCheckPT::startChkp(){
510   if(CkInCheckpointing()){
511     return;
512   }
513   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
514   CkStartMemCheckpoint(cpCallback);
515 }
516
517 // called by checkpoint mgr to restore an array element
518 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
519 {
520 #if CMK_MEM_CHECKPOINT
521   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
522   // m->index.print();
523   PUP::fromMem p(m->packData);
524   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
525   CmiAssert(mgr);
526 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
527   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
528 #else
529   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
530 #endif
531
532   // find a list of array elements bound together
533   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
534   CmiAssert(elt);
535   CkLocRec_local *rec = elt->myRec;
536   CkVec<CkMigratable *> list;
537   mgr->migratableList(rec, list);
538   CmiAssert(list.length() > 0);
539   for (int l=0; l<list.length(); l++) {
540     elt = (ArrayElement *)list[l];
541     elt->budPEs[0] = m->bud1;
542     elt->budPEs[1] = m->bud2;
543     //    reset, may not needed now
544     // for now.
545     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
546       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
547       if (c) c->redNo = 0;
548     }
549   }
550 #endif
551 }
552
553 // return 1 if pe is a crashed processor
554 int CkMemCheckPT::isFailed(int pe)
555 {
556   for (int i=0; i<failedPes.length(); i++)
557     if (failedPes[i] == pe) return 1;
558   return 0;
559 }
560
561 // add pe into history list of all failed processors
562 void CkMemCheckPT::failed(int pe)
563 {
564   if (isFailed(pe)) return;
565   failedPes.push_back(pe);
566 }
567
568 int CkMemCheckPT::totalFailed()
569 {
570   return failedPes.length();
571 }
572
573 // create an checkpoint entry for array element of aid with index.
574 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
575 {
576   // error check, no duplicate
577   int idx, len = ckTable.size();
578   for (idx=0; idx<len; idx++) {
579     CkCheckPTInfo *entry = ckTable[idx];
580     if (index == entry->index) {
581       if (loc == entry->locMgr) {
582         // bindTo array elements
583         return;
584       }
585       // for array inheritance, the following check may fail
586       // because ArrayElement constructor of all superclasses are called
587       if (aid == entry->aid) {
588         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
589         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
590       }
591     }
592   }
593   CkCheckPTInfo *newEntry;
594   if (where == CkCheckPoint_inMEM)
595     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
596   else
597     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
598   ckTable.push_back(newEntry);
599   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
600 }
601
602 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
603 {
604 #if !CMK_CHKP_ALL       
605   int buddy = msg->bud1;
606   if (buddy == CkMyPe()) buddy = msg->bud2;
607   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
608   recvData(msg);
609   // ack
610   thisProxy[buddy].gotData();
611 #else
612   chkpTable[0] = NULL;
613   chkpTable[1] = NULL;
614   recvArrayCheckpoint(msg);
615   thisProxy[msg->bud2].gotData();
616 #endif
617 }
618
619 void CkMemCheckPT::chkpLocalStart()
620 {
621   CpvAccess(localStarted) = 1;
622 }
623
624 // loop through my checkpoint table and ask checkpointed array elements
625 // to send me checkpoint data.
626 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
627 void CkMemCheckPT::doItNow(int starter)
628 {
629   checkpointed = 1;
630   //cpCallback = cb;
631   cpStarter = starter;
632   inCheckpointing = 1;
633   if (CkMyPe() == cpStarter) {
634     startTime = CmiWallTimer();
635     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
636   }
637 #if CMK_CONVERSE_MPI
638   if(CmiNumPartition()==1)
639 #endif    
640   {
641 #if !CMK_CHKP_ALL
642     int len = ckTable.length();
643     for (int i=0; i<len; i++) {
644       CkCheckPTInfo *entry = ckTable[i];
645       // always let the bigger number processor send request
646       //if (CkMyPe() < entry->pNo) continue;
647       // always let the smaller number processor send request, may on same proc
648       if (!isMaster(entry->pNo)) continue;
649       // call inmem_checkpoint to the array element, ask it to send
650       // back checkpoint data via recvData().
651       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
652       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
653     }
654     // if my table is empty, then I am done
655     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
656 #else
657     startArrayCheckpoint();
658 #endif
659     // pack and send proc level data
660     sendProcData();
661   }
662 #if CMK_CONVERSE_MPI
663   else
664   {
665     startCheckpoint();
666   }
667 #endif
668 }
669
670 class MemElementPacker : public CkLocIterator{
671   private:
672     //    CkLocMgr *locMgr;
673     PUP::er &p;
674     std::map<CkHashCode,CkLocation> arrayMap;
675   public:
676     MemElementPacker(PUP::er &p_):p(p_){};
677     void addLocation(CkLocation &loc){
678       CkArrayIndexMax idx = loc.getIndex();
679       arrayMap[idx.hash()] = loc;
680     }
681     void writeCheckpoint(){
682       std::map<CkHashCode, CkLocation>::iterator it;
683       for(it = arrayMap.begin();it!=arrayMap.end();it++){
684         CkLocation loc = it->second;
685         CkLocMgr *locMgr = loc.getManager();
686         CkArrayIndexMax idx = loc.getIndex();
687         CkGroupID gID = locMgr->ckGetGroupID();
688         p|gID;
689         p|idx;
690         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
691       }
692     }
693 };
694
695 void pupAllElements(PUP::er &p){
696 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
697   int numElements;
698   if(!p.isUnpacking()){
699     numElements = CkCountArrayElements();
700   }
701   p | numElements;
702   if(!p.isUnpacking()){
703     MemElementPacker packer(p);
704     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
705     packer.writeCheckpoint();
706   }
707 #endif
708 }
709
710 void CkMemCheckPT::startArrayCheckpoint(){
711 #if CMK_CHKP_ALL
712   int size;
713   {
714     PUP::sizer psizer;
715     pupAllElements(psizer);
716     size = psizer.size();
717   }
718   int packSize = size/sizeof(double)+1;
719   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
720   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
721   msg->len = size;
722   msg->cp_flag = 1;
723   int budPEs[2];
724   msg->bud1=CkMyPe();
725   msg->bud2=ChkptOnPe(CkMyPe());
726   {
727     PUP::toMem p(msg->packData);
728     pupAllElements(p);
729   }
730   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
731   if(chkpTable[0]) delete chkpTable[0];
732   chkpTable[0] = msg;
733   //send the checkpoint to my 
734   recvCount++;
735 #endif
736 }
737
738 void CkMemCheckPT::startCheckpoint(){
739 #if CMK_CONVERSE_MPI
740   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
741     CkPrintf("in start checkpointing!!!!\n");
742   int size;
743   {
744     PUP::sizer p;
745     _handleProcData(p,CmiFalse);
746     size = p.size();
747   }
748   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
749   procMsg->len = size;
750   procMsg->cp_flag = 1;
751   {
752     PUP::toMem p(procMsg->packData);
753     _handleProcData(p,CmiFalse);
754   }
755   int pointer = CpvAccess(curPointer);
756   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
757   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
758
759   {
760     PUP::sizer psizer;
761     pupAllElements(psizer);
762     size = psizer.size();
763   }
764   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
765   msg->len = size;
766   msg->cp_flag = 1;
767   int checksum;
768   {
769     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
770       PUP::checker p(msg->packData);
771       pupAllElements(p);
772       checksum = p.getChecksum();
773     }else{  
774 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
775       PUP::toMem p(msg->packData);
776       pupAllElements(p);
777     }
778   }
779   pointer = CpvAccess(curPointer);
780   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
781   CpvAccess(chkpBuf)[pointer] = msg;
782   if(CkMyPe()==0)
783     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
784   if(CkReplicaAlive()==1){
785     CpvAccess(recvdLocal) = 1;
786     if(CpvAccess(use_checksum)){
787       CpvAccess(localChecksum) = checksum;
788       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
789       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
790       //only one reaplica will send
791       if(CmiMyPartition()==0){
792         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
793         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
794       }
795     }else{
796       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
797       CkPackMessage(&env);
798       if(CmiMyPartition()==0){
799         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
800         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
801       }
802     }
803     if(CmiMyPartition()==0){
804       notifyReplica = 1;
805       thisProxy[CkMyPe()].doneComparison(true);
806     }
807   }
808   if(CpvAccess(recvdRemote)==1){
809     //only partition 1 will do it
810     //compare the checkpoint 
811     int size = CpvAccess(chkpBuf)[pointer]->len;
812     if(CpvAccess(use_checksum)){
813       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
814         thisProxy[CkMyPe()].doneComparison(true);
815       }
816       else{
817         thisProxy[CkMyPe()].doneComparison(false);
818       }
819     }else{
820       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
821         thisProxy[CkMyPe()].doneComparison(true);
822       }
823       else{
824         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
825         thisProxy[CkMyPe()].doneComparison(false);
826       }
827     }
828     if(CkMyPe()==0)
829       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
830   }
831   else{
832     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
833       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
834       if(CpvAccess(remoteReady)==1){
835         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
836         {       
837           int pointer = CpvAccess(curPointer);
838           //send the proc data
839           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
840           procMsg->pointer = pointer;
841           envelope * env = (envelope *)(UsrToEnv(procMsg));
842           CkPackMessage(&env);
843           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
844           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
845           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
846             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
847         }
848         //send the array checkpoint data
849         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
850         msg->pointer = CpvAccess(curPointer);
851         envelope * env = (envelope *)(UsrToEnv(msg));
852         CkPackMessage(&env);
853         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
854         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
855         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
856           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
857       }else{
858         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
859           int pointer = CpvAccess(curPointer);
860           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
861           (CpvAccess(recoverProcBuf))->pointer = pointer;
862         }
863         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
864         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
865         CpvAccess(localReady) = 1;
866       }
867       //can continue work, no need to wait for my replica
868       int _ret = 1;
869       notifyReplica = 1;
870       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
871       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
872     }
873   }
874 #endif
875 }
876
877 void CkMemCheckPT::doneComparison(bool ret){
878   int _ret = 1;
879   if(!ret){
880     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
881     _ret = 0;
882   }else{
883     _ret = 1;
884   }
885   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
886   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
887 }
888
889 void CkMemCheckPT::doneRComparison(int ret){
890   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
891   //    if(CpvAccess(curPointer) == 0){
892   //if(ret==CkNumPes()){
893     CpvAccess(localChkpDone) = 1;
894     if(CpvAccess(remoteChkpDone) ==1){
895       thisProxy.doneBothComparison();
896     }else{
897       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
898     }
899   //}
900   /*else{
901     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
902     startTime = CmiWallTimer();
903     thisProxy.RollBack();
904   }*/
905   if(notifyReplica == 0){
906     //notify the replica am done
907     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
908     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
909     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
910     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
911     notifyReplica = 1;
912   }
913 }
914
915 void CkMemCheckPT::doneBothComparison(){
916   CpvAccess(recvdRemote) = 0;
917   CpvAccess(remoteReady) = 0;
918   CpvAccess(localReady) = 0;
919   CpvAccess(recvdLocal) = 0;
920   CpvAccess(localChkpDone) = 0;
921   CpvAccess(remoteChkpDone) = 0;
922   CpvAccess(remoteStarted) = 0;
923   CpvAccess(localStarted) = 0;
924   CpvAccess(_remoteCrashedNode) = -1;
925   CkMemCheckPT::replicaAlive = 1;
926   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
927   CpvAccess(curPointer)^=1;
928   inCheckpointing = 0;
929   notifyReplica = 0;
930   if(CkMyPe() == 0){
931     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
932   }
933   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
934 }
935
936 void CkMemCheckPT::RollBack(){
937   //restore group data
938   checkpointed = 0;
939   inRestarting = 1;
940   int pointer = CpvAccess(curPointer)^1;//use the previous one
941   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
942   PUP::fromMem p(chkpMsg->packData);    
943
944   //destroy array elements
945   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
946   int numGroups = CkpvAccess(_groupIDTable)->size();
947   for(int i=0;i<numGroups;i++) {
948     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
949     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
950     obj->flushStates();
951     obj->ckJustMigrated();
952   }
953   //restore array elements
954
955   int numElements;
956   p|numElements;
957
958   if(p.isUnpacking()){
959     for(int i=0;i<numElements;i++){
960       CkGroupID gID;
961       CkArrayIndex idx;
962       p|gID;
963       p|idx;
964       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
965       CmiAssert(mgr);
966       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
967     }
968     }
969     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
970     contribute(cb);
971   }
972
973   void CkMemCheckPT::notifyReplicaDie(int pe){
974     replicaAlive = 0;
975     CpvAccess(_remoteCrashedNode) = pe;
976   }
977
978   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
979   {
980 #if CMK_CHKP_ALL
981     int idx = 1;
982     if(msg->bud1 == CkMyPe()){
983       idx = 0;
984     }
985     int isChkpting = msg->cp_flag;
986     if(isChkpting == 1){
987       if(chkpTable[idx]) delete chkpTable[idx];
988     }
989     chkpTable[idx] = msg;
990     if(isChkpting){
991       recvCount++;
992       if(recvCount == 2){
993         if (where == CkCheckPoint_inMEM) {
994           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
995         }
996         else if (where == CkCheckPoint_inDISK) {
997           // another barrier for finalize the writing using fsync
998           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
999           contribute(0,NULL,CkReduction::sum_int,localcb);
1000         }
1001         else
1002           CmiAbort("Unknown checkpoint scheme");
1003         recvCount = 0;
1004       }
1005     }
1006 #endif
1007   }
1008
1009   // don't handle array elements
1010   static inline void _handleProcData(PUP::er &p, CmiBool create)
1011   {
1012     // save readonlys, and callback BTW
1013     CkPupROData(p);
1014
1015     // save mainchares 
1016     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
1017
1018 #ifndef CMK_CHARE_USE_PTR
1019     // save non-migratable chare
1020     CkPupChareData(p);
1021 #endif
1022
1023     // save groups into Groups.dat
1024     CkPupGroupData(p,create);
1025
1026     // save nodegroups into NodeGroups.dat
1027     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
1028   }
1029
1030   void CkMemCheckPT::sendProcData()
1031   {
1032     // find out size of buffer
1033     int size;
1034     {
1035       PUP::sizer p;
1036       _handleProcData(p,CmiTrue);
1037       size = p.size();
1038     }
1039     int packSize = size;
1040     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1041     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1042     {
1043       PUP::toMem p(msg->packData);
1044       _handleProcData(p,CmiTrue);
1045     }
1046     msg->pe = CkMyPe();
1047     msg->len = size;
1048     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1049     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1050   }
1051
1052   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1053   {
1054     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1055     CpvAccess(procChkptBuf) = msg;
1056     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1057     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1058   }
1059
1060   // ArrayElement call this function to give us the checkpointed data
1061   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1062   {
1063     int len = ckTable.length();
1064     int idx;
1065     for (idx=0; idx<len; idx++) {
1066       CkCheckPTInfo *entry = ckTable[idx];
1067       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1068     }
1069     CkAssert(idx < len);
1070     int isChkpting = msg->cp_flag;
1071     ckTable[idx]->updateBuffer(msg);
1072     if (isChkpting) {
1073       // all my array elements have returned their inmem data
1074       // inform starter processor that I am done.
1075       recvCount ++;
1076       if (recvCount == ckTable.length()) {
1077         if (where == CkCheckPoint_inMEM) {
1078           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1079         }
1080         else if (where == CkCheckPoint_inDISK) {
1081           // another barrier for finalize the writing using fsync
1082           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1083           contribute(0,NULL,CkReduction::sum_int,localcb);
1084         }
1085         else
1086           CmiAbort("Unknown checkpoint scheme");
1087         recvCount = 0;
1088       } 
1089     }
1090   }
1091
1092   // only used in disk checkpointing
1093   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1094   {
1095     delete m;
1096 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1097     system("sync");
1098 #endif
1099     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1100   }
1101
1102   // only is called on cpStarter when checkpoint is done
1103   void CkMemCheckPT::cpFinish()
1104   {
1105     CmiAssert(CkMyPe() == cpStarter);
1106     peCount++;
1107     // now that all processors have finished, activate callback
1108     if (peCount == 2) 
1109     {
1110       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1111       peCount = 0;
1112       thisProxy.report();
1113     }
1114   }
1115
1116   // for debugging, report checkpoint info
1117   void CkMemCheckPT::report()
1118   {
1119     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1120     inCheckpointing = 0;
1121 #if !CMK_CHKP_ALL
1122     int objsize = 0;
1123     int len = ckTable.length();
1124     for (int i=0; i<len; i++) {
1125       CkCheckPTInfo *entry = ckTable[i];
1126       CmiAssert(entry);
1127       objsize += entry->getSize();
1128     }
1129     CmiAssert(CpvAccess(procChkptBuf));
1130     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1131 #else
1132     if(CkMyPe()==0)
1133       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1134 #endif
1135   }
1136
1137   /*****************************************************************************
1138     RESTART Procedure
1139    *****************************************************************************/
1140
1141   // master processor of two buddies
1142   inline int CkMemCheckPT::isMaster(int buddype)
1143   {
1144 #if 0
1145     int mype = CkMyPe();
1146     //CkPrintf("ismaster: %d %d\n", pe, mype);
1147     if (CkNumPes() - totalFailed() == 2) {
1148       return mype > buddype;
1149     }
1150     for (int i=1; i<CkNumPes(); i++) {
1151       int me = (buddype+i)%CkNumPes();
1152       if (isFailed(me)) continue;
1153       if (me == mype) return 1;
1154       else return 0;
1155     }
1156     return 0;
1157 #else
1158     // smaller one
1159     int mype = CkMyPe();
1160     //CkPrintf("ismaster: %d %d\n", pe, mype);
1161     if (CkNumPes() - totalFailed() == 2) {
1162       return mype < buddype;
1163     }
1164 #if NODE_CHECKPOINT
1165     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1166     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1167 #else
1168       for (int i=1; i<CkNumPes(); i++) {
1169 #endif
1170         int me = (mype+i)%CkNumPes();
1171         if (isFailed(me)) continue;
1172         if (me == buddype) return 1;
1173         else return 0;
1174       }
1175       return 0;
1176 #endif
1177     }
1178
1179
1180
1181 #if 0
1182     // helper class to pup all elements that belong to same ckLocMgr
1183     class ElementDestoryer : public CkLocIterator {
1184       private:
1185         CkLocMgr *locMgr;
1186       public:
1187         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1188         void addLocation(CkLocation &loc) {
1189           CkArrayIndex idx=loc.getIndex();
1190           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1191           loc.destroy();
1192         }
1193     };
1194 #endif
1195
1196     // restore the bitmap vector for LB
1197     void CkMemCheckPT::resetLB(int diepe)
1198     {
1199 #if CMK_LBDB_ON
1200       int i;
1201       char *bitmap = new char[CkNumPes()];
1202       // set processor available bitmap
1203       get_avail_vector(bitmap);
1204       for (i=0; i<failedPes.length(); i++)
1205         bitmap[failedPes[i]] = 0; 
1206       bitmap[diepe] = 0;
1207
1208 #if CK_NO_PROC_POOL
1209       set_avail_vector(bitmap);
1210 #endif
1211
1212       // if I am the crashed pe, rebuild my failedPEs array
1213       if (CkMyNode() == diepe)
1214         for (i=0; i<CkNumPes(); i++) 
1215           if (bitmap[i]==0) failed(i);
1216
1217       delete [] bitmap;
1218 #endif
1219     }
1220
1221     static double restartT;
1222
1223     // in case when failedPe dies, everybody go through its checkpoint table:
1224     // destory all array elements
1225     // recover lost buddies
1226     // reconstruct all array elements from check point data
1227     // called on all processors
1228     void CkMemCheckPT::restart(int diePe)
1229     {
1230 #if CMK_MEM_CHECKPOINT
1231       thisFailedPe = diePe;
1232       double curTime = CmiWallTimer();
1233       if (CkMyPe() == thisFailedPe){
1234         restartT = CmiWallTimer();
1235         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1236       }
1237       stage = (char*)"resetLB";
1238       startTime = curTime;
1239       if (CkMyPe() == thisFailedPe)
1240         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1241
1242 //#if CK_NO_PROC_POOL
1243 //      failed(diePe);  // add into the list of failed pes
1244 //#endif
1245
1246 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1247
1248       inRestarting = 1;
1249
1250       // disable load balancer's barrier
1251       //if (CkMyPe() != diePe) resetLB(diePe);
1252
1253       CKLOCMGR_LOOP(mgr->startInserting(););
1254
1255
1256       if(CmiNumPartition()==1){
1257         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1258       }else{
1259         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1260         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1261       }
1262 #endif
1263     }
1264
1265     // loally remove all array elements
1266     void CkMemCheckPT::removeArrayElements()
1267     {
1268 #if CMK_MEM_CHECKPOINT
1269       int len = ckTable.length();
1270       double curTime = CmiWallTimer();
1271       if (CkMyPe() == thisFailedPe) 
1272         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1273       stage = (char*)"removeArrayElements";
1274       startTime = curTime;
1275
1276       //  if (cpCallback.isInvalid()) 
1277       //          CkPrintf("invalid pe %d\n",CkMyPe());
1278       //          CkAbort("Didn't set restart callback\n");;
1279       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1280
1281       // get rid of all buffering and remote recs
1282       // including destorying all array elements
1283 #if CK_NO_PROC_POOL  
1284       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1285 #else
1286       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1287 #endif
1288       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1289 #endif
1290     }
1291
1292     // flush state in reduction manager
1293     void CkMemCheckPT::resetReductionMgr()
1294     {
1295       if (CkMyPe() == thisFailedPe) 
1296         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1297       stage = (char *)"resetReductionMgr";
1298       int numGroups = CkpvAccess(_groupIDTable)->size();
1299       for(int i=0;i<numGroups;i++) {
1300         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1301         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1302         obj->flushStates();
1303         obj->ckJustMigrated();
1304       }
1305
1306       //reset CmiReduce
1307       CmiResetReductions();
1308
1309       // reset again
1310       //CpvAccess(_qd)->flushStates();
1311       if(CmiNumPartition()==1){
1312         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1313       }
1314       else{
1315         if (CkMyPe() == thisFailedPe) 
1316           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1317         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1318       }
1319     }
1320
1321     // recover the lost buddies
1322     void CkMemCheckPT::recoverBuddies()
1323     {
1324       int idx;
1325       int len = ckTable.length();
1326       // ready to flush reduction manager
1327       // cannot be CkMemCheckPT::restart because destory will modify states
1328       double curTime = CmiWallTimer();
1329       if (CkMyPe() == thisFailedPe)
1330         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1331       stage = (char *)"recoverBuddies";
1332       if (CkMyPe() == thisFailedPe)
1333         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1334       startTime = curTime;
1335
1336       // recover buddies
1337       expectCount = 0;
1338 #if !CMK_CHKP_ALL
1339       for (idx=0; idx<len; idx++) {
1340         CkCheckPTInfo *entry = ckTable[idx];
1341         if (entry->pNo == thisFailedPe) {
1342 #if CK_NO_PROC_POOL
1343           // find a new buddy
1344           int budPe = BuddyPE(CkMyPe());
1345 #else
1346           int budPe = thisFailedPe;
1347 #endif
1348           CkArrayCheckPTMessage *msg = entry->getCopy();
1349           msg->bud1 = budPe;
1350           msg->bud2 = CkMyPe();
1351           msg->cp_flag = 0;            // not checkpointing
1352           thisProxy[budPe].recoverEntry(msg);
1353           expectCount ++;
1354         }
1355       }
1356 #else
1357       //send to failed pe
1358       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1359 #if CK_NO_PROC_POOL
1360         // find a new buddy
1361         int budPe = BuddyPE(CkMyPe());
1362 #else
1363         int budPe = thisFailedPe;
1364 #endif
1365         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1366         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1367         msg->cp_flag = 0;            // not checkpointing
1368         msg->bud1 = budPe;
1369         msg->bud2 = CkMyPe();
1370         thisProxy[budPe].recoverEntry(msg);
1371         expectCount ++;
1372       }
1373 #endif
1374
1375       if (expectCount == 0) {
1376         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1377       }
1378       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1379     }
1380
1381     void CkMemCheckPT::gotData()
1382     {
1383       ackCount ++;
1384       if (ackCount == expectCount) {
1385         ackCount = 0;
1386         expectCount = -1;
1387         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1388         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1389       }
1390     }
1391
1392     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1393     {
1394
1395       for (int i=0; i<n; i++) {
1396         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1397         mgr->updateLocation(idx[i], nowOnPe);
1398       }
1399       thisProxy[nowOnPe].gotReply();
1400     }
1401
1402     // restore array elements
1403     void CkMemCheckPT::recoverArrayElements()
1404     {
1405       double curTime = CmiWallTimer();
1406       int len = ckTable.length();
1407       if (CkMyPe() == thisFailedPe)
1408         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1409       stage = (char *)"recoverArrayElements";
1410       startTime = curTime;
1411       int flag = 0;
1412       // recover all array elements
1413       int count = 0;
1414
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1417       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1418 #endif
1419
1420 #if !CMK_CHKP_ALL
1421       for (int idx=0; idx<len; idx++)
1422       {
1423         CkCheckPTInfo *entry = ckTable[idx];
1424 #if CK_NO_PROC_POOL
1425         // the bigger one will do 
1426         //    if (CkMyPe() < entry->pNo) continue;
1427         if (!isMaster(entry->pNo)) continue;
1428 #else
1429         // smaller one do it, which has the original object
1430         if (CkMyPe() == entry->pNo+1 || 
1431             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1432 #endif
1433         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1434
1435         entry->updateBuddy(CkMyPe(), entry->pNo);
1436         CkArrayCheckPTMessage *msg = entry->getCopy();
1437         // gzheng
1438         //thisProxy[CkMyPe()].inmem_restore(msg);
1439         inmem_restore(msg);
1440 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1441         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1442         int homePe = mgr->homePe(msg->index);
1443         if (homePe != CkMyPe()) {
1444           gmap[homePe].push_back(msg->locMgr);
1445           imap[homePe].push_back(msg->index);
1446         }
1447 #endif
1448         CkFreeMsg(msg);
1449         count ++;
1450       }
1451 #else
1452       char * packData;
1453       if(CmiNumPartition()==1){
1454         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1455         packData = (char *)msg->packData;
1456       }
1457       else{
1458         int pointer = CpvAccess(curPointer);
1459         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1460         packData = msg->packData;
1461       }
1462 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1463       recoverAll(packData,gmap,imap);
1464 #else
1465       recoverAll(packData);
1466 #endif
1467 #endif
1468 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1469       for (int i=0; i<CkNumPes(); i++) {
1470         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1471           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1472           flag++;       
1473         }
1474       }
1475       delete [] imap;
1476       delete [] gmap;
1477 #endif
1478       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1479       //if (CkMyPe() == thisFailedPe)
1480
1481       CKLOCMGR_LOOP(mgr->doneInserting(););
1482
1483       // _crashedNode = -1;
1484       CpvAccess(_crashedNode) = -1;
1485 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1486       if (CkMyPe() == 0)
1487         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1488 #else
1489       if(flag == 0)
1490       {
1491         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1492       }
1493 #endif
1494     }
1495
1496     void CkMemCheckPT::gotReply(){
1497       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1498     }
1499
1500     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1501 #if CMK_CHKP_ALL
1502       PUP::fromMem p(packData);
1503       int numElements;
1504       p|numElements;
1505       if(p.isUnpacking()){
1506         for(int i=0;i<numElements;i++){
1507           CkGroupID gID;
1508           CkArrayIndex idx;
1509           p|gID;
1510           p|idx;
1511           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1512           int homePe = mgr->homePe(idx);
1513 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1514           mgr->resume(idx,p,CmiTrue,CmiTrue);
1515 #else
1516           if(CmiNumPartition()==1)
1517             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1518           else{
1519             if(CkMyPe()==thisFailedPe){
1520               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1521             }
1522             else{
1523               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1524             }
1525           }
1526 #endif
1527 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1528           homePe = mgr->homePe(idx);
1529           if (homePe != CkMyPe()) {
1530             gmap[homePe].push_back(gID);
1531             imap[homePe].push_back(idx);
1532           }
1533 #endif
1534         }
1535       }
1536 #endif
1537     }
1538
1539
1540     // on every processor
1541     // turn load balancer back on
1542     void CkMemCheckPT::finishUp()
1543     {
1544       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1545       //CKLOCMGR_LOOP(mgr->doneInserting(););
1546
1547       if (CkMyPe() == thisFailedPe)
1548       {
1549         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1550         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1551         fflush(stdout);
1552       }
1553
1554       if(CkMyPe()==0){
1555         replicaInjectFailure();
1556       }
1557
1558       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1559       inRestarting = 0;
1560       maxIter = -1;
1561 #if CMK_CONVERSE_MPI    
1562       if(CmiNumPartition()!=1){
1563         CpvAccess(recvdProcChkp) = 0;
1564         CpvAccess(recvdArrayChkp) = 0;
1565         CpvAccess(curPointer)^=1;
1566         //notify my replica, restart is done
1567         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1568           CpvAccess(remoteStarted) =0;
1569           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1570           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1571           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1572         }
1573       }
1574       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1575         lastPingTime = CmiWallTimer();
1576         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1577       }
1578       //inject next failure
1579       thisProxy[CkMyPe()].generateFailure();
1580 #endif
1581
1582 #if CK_NO_PROC_POOL
1583 #if NODE_CHECKPOINT
1584       int numnodes = CmiNumPhysicalNodes();
1585 #else
1586       int numnodes = CkNumPes();
1587 #endif
1588       if (numnodes-totalFailed() <=2) {
1589         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1590         _memChkptOn = 0;
1591       }
1592 #endif
1593     }
1594
1595     void CkMemCheckPT::recoverFromSoftFailure()
1596     {
1597       inRestarting = 0;
1598       maxIter = -1;
1599       CpvAccess(recvdRemote) = 0;
1600       CpvAccess(recvdLocal) = 0;
1601       CpvAccess(localChkpDone) = 0;
1602       CpvAccess(remoteChkpDone) = 0;
1603       CpvAccess(remoteReady) = 0;
1604       CpvAccess(localReady) = 0;
1605       inCheckpointing = 0;
1606       notifyReplica = 0;
1607       CpvAccess(remoteStarted) = 0;
1608       CpvAccess(localStarted) = 0;
1609       CpvAccess(_remoteCrashedNode) = -1;
1610       CkMemCheckPT::replicaAlive = 1;
1611       inCheckpointing = 0;
1612       notifyReplica = 0;
1613       if(CkMyPe() == 0){
1614         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1615       }
1616       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1617     }
1618     // called only on 0
1619     void CkMemCheckPT::quiescence(CkCallback &cb)
1620     {
1621       static int pe_count = 0;
1622       pe_count ++;
1623       CmiAssert(CkMyPe() == 0);
1624       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1625       if (pe_count == CkNumPes()) {
1626         pe_count = 0;
1627         cb.send();
1628       }
1629     }
1630
1631     // User callable function - to start a checkpoint
1632     // callback cb is used to pass control back
1633     void CkStartMemCheckpoint(CkCallback &cb)
1634     {
1635 #if CMK_MEM_CHECKPOINT
1636       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1637       /*if (_memChkptOn == 0) {
1638         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1639         cb.send();
1640         return;
1641         }*/
1642       if (CkInRestarting()) {
1643         // trying to checkpointing during restart
1644         cb.send();
1645         return;
1646       }
1647       if(CkInCheckpointing())
1648         return;
1649       // store user callback and user data
1650       CkMemCheckPT::cpCallback = cb;
1651
1652
1653       //send to my replica that checkpoint begins 
1654       if(CkReplicaAlive()==1){
1655         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1656         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1657         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1658       }
1659       CpvAccess(localStarted) = 1;
1660       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1661       checkptMgr.chkpLocalStart();
1662       // broadcast to start check pointing
1663       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1664         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1665         checkptMgr.doItNow(CkMyPe());
1666       }
1667 #else
1668       // when mem checkpoint is disabled, invike cb immediately
1669       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1670       cb.send();
1671 #endif
1672     }
1673
1674     void CkRestartCheckPoint(int diePe)
1675     {
1676       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1677       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1678       // broadcast
1679       checkptMgr.restart(diePe);
1680     }
1681
1682     static int _diePE = -1;
1683
1684     // callback function used locally by ccs handler
1685     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1686     {
1687       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1688       CkRestartCheckPoint(_diePE);
1689     }
1690
1691
1692     // called on crashed PE
1693     static void restartBeginHandler(char *msg)
1694     {
1695       CmiFree(msg);
1696 #if CMK_MEM_CHECKPOINT
1697 #if CMK_USE_BARRIER
1698       if(CkMyPe()!=_diePE){
1699         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1700         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1701         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1702         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1703       }else{
1704         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1705         CkRestartCheckPointCallback(NULL, NULL);
1706       }
1707 #else
1708       static int count = 0;
1709       CmiAssert(CkMyPe() == _diePE);
1710       count ++;
1711       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1712         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1713         CkRestartCheckPointCallback(NULL, NULL);
1714         count = 0;
1715       }
1716 #endif
1717 #endif
1718     }
1719
1720     extern void _discard_charm_message();
1721     extern void _resume_charm_message();
1722
1723     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1724       return data;
1725     }
1726
1727     static void restartBcastHandler(char *msg)
1728     {
1729 #if CMK_MEM_CHECKPOINT
1730       // advance phase counter
1731       CkMemCheckPT::inRestarting = 1;
1732       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1733       // gzheng
1734       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1735
1736       if (CkMyPe()==_diePE)
1737         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1738
1739       // reset QD counters
1740       /*  gzheng
1741           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1742        */
1743
1744       /*  gzheng
1745           if (CkMyPe()==_diePE)
1746           CkRestartCheckPointCallback(NULL, NULL);
1747        */
1748       CmiFree(msg);
1749
1750       _resume_charm_message();
1751
1752       // reduction
1753       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1754       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1755 #if CMK_USE_BARRIER
1756       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1757 #else
1758       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1759 #endif 
1760       checkpointed = 0;
1761 #endif
1762     }
1763
1764     extern void _initDone();
1765
1766     bool compare(char * buf1, char *buf2){
1767       if(CkMyPe()==0)
1768         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1769       PUP::checker pchecker(buf1,buf2);
1770       pchecker.skip();
1771       int numElements;
1772       pchecker|numElements;
1773       for(int i=0;i<numElements;i++){
1774         CkGroupID gID;
1775         CkArrayIndex idx;
1776
1777         pchecker|gID;
1778         pchecker|idx;
1779
1780         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1781         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1782       }
1783       if(CkMyPe()==0)
1784         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1785       //int fault_num = pchecker.getFaultNum();
1786       bool result = pchecker.getResult();
1787       /*if(!result){
1788         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1789       }*/
1790       return result;
1791     }
1792     int getChecksum(char * buf){
1793       PUP::checker pchecker(buf);
1794       pchecker.skip();
1795       int numElements;
1796       pchecker|numElements;
1797 //      CkPrintf("num %d\n",numElements);
1798       for(int i=0;i<numElements;i++){
1799         CkGroupID gID;
1800         CkArrayIndex idx;
1801
1802         pchecker|gID;
1803         pchecker|idx;
1804
1805         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1806         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1807       }
1808       return pchecker.getChecksum();
1809     }
1810
1811     static void recvRemoteChkpHandler(char *msg){
1812       CpvAccess(remoteChkpDone) = 1;
1813       if(CpvAccess(use_checksum)){
1814         if(CkMyPe()==0)
1815           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1816         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1817         CpvAccess(recvdRemote) = 1;
1818         if(CpvAccess(recvdLocal)==1){
1819           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1820             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1821           }
1822           else{
1823             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1824           }
1825         }
1826       }else{
1827         envelope *env = (envelope *)msg;
1828         CkUnpackMessage(&env);
1829         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1830         if(CpvAccess(recvdLocal)==1){
1831           int pointer = CpvAccess(curPointer);
1832           int size = CpvAccess(chkpBuf)[pointer]->len;
1833           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1834             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1835           }else
1836           {
1837             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1838           }
1839           delete chkpMsg;
1840           if(CkMyPe()==0)
1841             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1842         }else{
1843           CpvAccess(recvdRemote) = 1;
1844           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1845           CpvAccess(buddyBuf) = chkpMsg;
1846         }
1847       }
1848     }
1849
1850     static void replicaRecoverHandler(char *msg){
1851       //fflush(stdout);
1852       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1853       CpvAccess(remoteChkpDone) = 1;
1854       if(CpvAccess(localChkpDone) == 1)
1855         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1856       else{  
1857         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1858         //CmiAbort("impossible state");
1859       }
1860       CmiFree(msg);
1861     }
1862
1863     
1864     static void replicaBeginFailureInjectionHandler(char * msg){
1865       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1866       checkptMgr.generateFailure();
1867       CmiFree(msg);
1868     }
1869     
1870     static void replicaChkpDoneHandler(char *msg){
1871       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1872       CpvAccess(remoteChkpDone) = 1;
1873       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1874       if(CpvAccess(localChkpDone) == 1)
1875         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1876       else  
1877         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1878       CmiFree(msg);
1879     }
1880
1881     static void replicaDieHandler(char * msg){
1882 #if CMK_CONVERSE_MPI
1883       //broadcast to every one in my replica
1884       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1885       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1886 #endif
1887     }
1888
1889     static void replicaChkpStartHandler(char * msg){
1890       if(CkMyPe()==0){
1891         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1892       }
1893       CpvAccess(remoteStarted) =1;
1894       if(CpvAccess(localStarted)==1){    
1895         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1896         checkptMgr.doItNow(0);
1897       }
1898     }
1899
1900     static void replicaDieBcastHandler(char *msg){
1901       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1902       CpvAccess(_remoteCrashedNode) = diePe;
1903       if(CkMyPe()==diePe){
1904         CmiPrintf("pe %d in replica world die\n",diePe);
1905         fflush(stdout);
1906       }
1907       
1908       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1909         find_spare_mpirank(diePe,CmiMyPartition()^1);
1910       }
1911
1912       //what if am already ready to checkpoint
1913       if(CpvAccess(resilience)!=1){
1914         CkMemCheckPT::replicaAlive = 0;
1915         if(CpvAccess(localStarted)==1){    
1916           if(CkMyPe()==0){
1917             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1918             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1919             checkptMgr.doItNow(0);
1920           }
1921         }
1922         //broadcast to my partition to get local max iter
1923         else{
1924           if(CkMyPe()==diePe){
1925             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
1926           }
1927           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1928         }
1929       }
1930
1931       CmiFree(msg);
1932     }
1933
1934     static void recoverRemoteProcDataHandler(char *msg){
1935       envelope *env = (envelope *)msg;
1936       CkUnpackMessage(&env);
1937       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1938
1939       //store the checkpoint
1940       int pointer = procMsg->pointer;
1941
1942
1943       if(CkMyPe()==CpvAccess(_crashedNode)){
1944         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1945         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1946         PUP::fromMem p(procMsg->packData);
1947         _handleProcData(p,CmiTrue);
1948         _initDone();
1949         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
1950       }
1951       else{
1952         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1953         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1954         //_handleProcData(p,CmiFalse);
1955       }
1956       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1957       CKLOCMGR_LOOP(mgr->startInserting(););
1958
1959       CpvAccess(recvdProcChkp) =1;
1960       if(CpvAccess(recvdArrayChkp)==1){
1961         _resume_charm_message();
1962         _diePE = CpvAccess(_crashedNode);
1963         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1964         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1965         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1966 #if CMK_USE_BARRIER
1967         if(CpvAccess(resilience)==1){
1968           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1969         }else
1970           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1971 #else
1972         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1973 #endif 
1974       }
1975     }
1976
1977     static void recoverRemoteArrayDataHandler(char *msg){
1978       envelope *env = (envelope *)msg;
1979       CkUnpackMessage(&env);
1980       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1981
1982       //store the checkpoint
1983       int pointer = chkpMsg->pointer;
1984       CpvAccess(curPointer) = pointer;
1985       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1986       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1987       CpvAccess(recvdArrayChkp) =1;
1988       CkMemCheckPT::inRestarting = 1;
1989
1990       if(CkMyPe()==CpvAccess(_crashedNode))
1991         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
1992       
1993       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1994         _resume_charm_message();
1995         _diePE = CpvAccess(_crashedNode);
1996         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1997         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1998         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1999         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2000 #if CMK_USE_BARRIER
2001         if(CpvAccess(resilience)==1){
2002           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2003         }else
2004           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2005 #else
2006         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2007 #endif 
2008       }
2009     }
2010
2011     static void recvPhaseHandler(char * msg)
2012     {
2013       //decrease the phase number so that the crashed replica can communicate with the healthy one
2014       CpvAccess(_curRestartPhase)--;
2015       
2016       CkMemCheckPT::inRestarting = 1;
2017       CmiFree(msg);
2018       //notify the buddy in the replica now i can receive the checkpoint msg
2019       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
2020         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2021         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
2022         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
2023         //timer start
2024         if(CpvAccess(_crashedNode)==CkMyPe()){
2025           CkMemCheckPT::startTime = restartT = CmiWallTimer();
2026           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
2027         }
2028       }else{
2029         CpvAccess(curPointer)^=1;
2030         CkMemCheckPT::inRestarting = 1;
2031         _resume_charm_message();
2032         _diePE = CpvAccess(_crashedNode);
2033       }
2034     }
2035     
2036     static void askRecoverDataHandler(char * msg){
2037       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2038         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
2039       if(CpvAccess(resilience)!=1){
2040         CpvAccess(remoteReady)=1;
2041         if(CpvAccess(localReady)==1){
2042           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2043           {     
2044             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
2045             CkPackMessage(&env);
2046             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2047             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2048             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2049           }
2050           //send the array checkpoint data
2051           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2052           CkPackMessage(&env);
2053           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2054           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2055           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2056             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2057         }
2058       }else{
2059         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2060         int pointer = CpvAccess(curPointer)^1;
2061         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2062         procMsg->pointer = pointer;
2063         envelope * env = (envelope *)(UsrToEnv(procMsg));
2064         CkPackMessage(&env);
2065         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2066         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2067         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2068         
2069         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2070         arrayMsg->pointer = pointer;
2071         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2072         CkPackMessage(&env1);
2073         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2074         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2075         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2076       }
2077     }
2078     // called on crashed processor
2079     static void recoverProcDataHandler(char *msg)
2080     {
2081 #if CMK_MEM_CHECKPOINT
2082       int i;
2083       envelope *env = (envelope *)msg;
2084       CkUnpackMessage(&env);
2085       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2086       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2087       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2088       PUP::fromMem p(procMsg->packData);
2089       _handleProcData(p,CmiTrue);
2090
2091       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2092       // gzheng
2093       CKLOCMGR_LOOP(mgr->startInserting(););
2094
2095       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2096       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2097       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2098       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2099
2100       _initDone();
2101       //   CpvAccess(_qd)->flushStates();
2102       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2103 #endif
2104     }
2105     //for replica, got the phase number from my neighbor processor in the current partition
2106     static void askPhaseHandler(char *msg)
2107     {
2108 #if CMK_MEM_CHECKPOINT
2109       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2110       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2111       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2112       CmiSetHandler(msg, recvPhaseHandlerIdx);
2113       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2114 #endif
2115     }
2116     // called on its backup processor
2117     // get backup message buffer and sent to crashed processor
2118     static void askProcDataHandler(char *msg)
2119     {
2120 #if CMK_MEM_CHECKPOINT
2121       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2122       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2123       if (CpvAccess(procChkptBuf) == NULL)  {
2124         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2125         CkAbort("no checkpoint found");
2126       }
2127       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2128       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2129
2130       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2131
2132       CkPackMessage(&env);
2133       CmiSetHandler(env, recoverProcDataHandlerIdx);
2134       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2135       CpvAccess(procChkptBuf) = NULL;
2136       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2137 #endif
2138     }
2139
2140     // called on PE 0
2141     void qd_callback(void *m)
2142     {
2143       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2144       fflush(stdout);
2145       CkFreeMsg(m);
2146       if(CmiNumPartition()==1){
2147 #ifdef CMK_SMP
2148         for(int i=0;i<CmiMyNodeSize();i++){
2149           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2150           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2151           CmiSetHandler(msg, askProcDataHandlerIdx);
2152           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2153           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2154         }
2155         return;
2156 #endif
2157         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2158         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2159         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2160         CmiSetHandler(msg, askProcDataHandlerIdx);
2161         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2162         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2163       }
2164       else{
2165         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2166         CmiSetHandler(msg, recvPhaseHandlerIdx);
2167         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2168       }
2169     }
2170
2171     // on crashed node
2172     void CkMemRestart(const char *dummy, CkArgMsg *args)
2173     {
2174 #if CMK_MEM_CHECKPOINT
2175       _diePE = CmiMyNode();
2176       CpvAccess( _crashedNode )= CmiMyNode();
2177       CkMemCheckPT::inRestarting = 1;
2178       _discard_charm_message();
2179
2180       /*if(CmiMyRank()==0){
2181         CkCallback cb(qd_callback);
2182         CkStartQD(cb);
2183         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2184         }*/
2185       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2186       if(CmiNumPartition()==1){
2187         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2188         restartT = CmiWallTimer();
2189         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2190         fflush(stdout);
2191         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2192         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2193         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2194         CmiSetHandler(msg, askProcDataHandlerIdx);
2195         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2196         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2197       }
2198       else{
2199         CkCallback cb(qd_callback);
2200         CkStartQD(cb);
2201       }
2202 #else
2203       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2204 #endif
2205     }
2206
2207     // can be called in other files
2208     // return true if it is in restarting
2209     extern "C"
2210       int CkInRestarting()
2211       {
2212 #if CMK_MEM_CHECKPOINT
2213         if (CpvAccess( _crashedNode)!=-1) return 1;
2214         // gzheng
2215         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2216         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2217         return CkMemCheckPT::inRestarting;
2218 #else
2219         return 0;
2220 #endif
2221       }
2222
2223     extern "C"
2224       int CkReplicaAlive()
2225       {
2226         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2227         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2228         return CkMemCheckPT::replicaAlive;
2229
2230         /*if(CkMemCheckPT::replicaDead==1)
2231           return 0;
2232           else          
2233           return 1;*/
2234       }
2235
2236     extern "C"
2237       int CkInCheckpointing()
2238       {
2239         return CkMemCheckPT::inCheckpointing;
2240       }
2241
2242     extern "C"
2243       void CkSetInLdb(){
2244 #if CMK_MEM_CHECKPOINT
2245         CkMemCheckPT::inLoadbalancing = 1;
2246 #endif
2247       }
2248
2249     extern "C"
2250       int CkInLdb(){
2251 #if CMK_MEM_CHECKPOINT
2252         return CkMemCheckPT::inLoadbalancing;
2253 #endif
2254         return 0;
2255       }
2256
2257     extern "C"
2258       void CkResetInLdb(){
2259 #if CMK_MEM_CHECKPOINT
2260         CkMemCheckPT::inLoadbalancing = 0;
2261 #endif
2262       }
2263
2264     /*****************************************************************************
2265       module initialization
2266      *****************************************************************************/
2267
2268     static int arg_where = CkCheckPoint_inMEM;
2269
2270 #if CMK_MEM_CHECKPOINT
2271     void init_memcheckpt(char **argv)
2272     {
2273       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2274         arg_where = CkCheckPoint_inDISK;
2275       }
2276       CpvInitialize(int, use_checksum);
2277       CpvInitialize(int, resilience);
2278       CpvAccess(use_checksum)=0;
2279       CpvAccess(resilience)=0;//TODO should set a default resilience level
2280       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2281         CpvAccess(use_checksum)=1;
2282       }
2283       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2284         CpvAccess(resilience)=1;
2285       }
2286       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2287         CpvAccess(resilience)=2;
2288       }
2289       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2290         CpvAccess(resilience)=3;
2291       }
2292       // initiliazing _crashedNode variable
2293       CpvInitialize(int, _crashedNode);
2294       CpvInitialize(int, _remoteCrashedNode);
2295       CpvAccess(_crashedNode) = -1;
2296       CpvAccess(_remoteCrashedNode) = -1;
2297       init_FI(argv);
2298     }
2299 #endif
2300
2301     class CkMemCheckPTInit: public Chare {
2302       public:
2303         CkMemCheckPTInit(CkArgMsg *m) {
2304 #if CMK_MEM_CHECKPOINT
2305           if (arg_where == CkCheckPoint_inDISK) {
2306             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2307           }
2308
2309           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2310           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2311 #endif
2312         }
2313     };
2314
2315     static void notifyHandler(char *msg)
2316     {
2317 #if CMK_MEM_CHECKPOINT
2318       CmiFree(msg);
2319       /* immediately increase restart phase to filter old messages */
2320       CpvAccess(_curRestartPhase) ++;
2321       CpvAccess(_qd)->flushStates();
2322       _discard_charm_message();
2323
2324 #endif
2325     }
2326
2327     extern "C"
2328       void notify_crash(int node)
2329       {
2330 #ifdef CMK_MEM_CHECKPOINT
2331         CpvAccess( _crashedNode) = node;
2332 #ifdef CMK_SMP
2333         for(int i=0;i<CkMyNodeSize();i++){
2334           CpvAccessOther(_crashedNode,i)=node;
2335         }
2336 #endif
2337         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2338         CkMemCheckPT::inRestarting = 1;
2339
2340         // this may be in interrupt handler, send a message to reset QD
2341         int pe = CmiNodeFirst(CkMyNode());
2342         for(int i=0;i<CkMyNodeSize();i++){
2343           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2344           CmiSetHandler(msg, notifyHandlerIdx);
2345           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2346         }
2347 #endif
2348       }
2349
2350     extern "C" void (*notify_crash_fn)(int node);
2351
2352
2353 #if CMK_CONVERSE_MPI
2354     void buddyDieHandler(char *msg)
2355     {
2356 #if CMK_MEM_CHECKPOINT
2357       // notify
2358       CkMemCheckPT::inRestarting = 1;
2359       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2360       notify_crash(diepe);
2361       // send message to crash pe to let it restart
2362       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2363       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2364       int buddy = obj->BuddyPE(CmiMyPe());
2365       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2366       if (buddy == diepe)  {
2367         mpi_restart_crashed(diepe, newrank);
2368       }
2369 #endif
2370     }
2371
2372     void pingHandler(void *msg)
2373     {
2374       lastPingTime = CmiWallTimer();
2375       CmiFree(msg);
2376     }
2377
2378     void pingCheckHandler()
2379     {
2380 #if CMK_MEM_CHECKPOINT
2381       double now = CmiWallTimer();
2382       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2383         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2384         int i, pe, buddy;
2385         // tell everyone the buddy dies
2386         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2387         for (i = 1; i < CmiNumPes(); i++) {
2388           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2389           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2390         }
2391         buddy = pe;
2392         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2393         fflush(stdout);
2394         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2395         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2396         CmiSetHandler(msg, buddyDieHandlerIdx);
2397         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2398         //send to everyone in the other world
2399         if(CmiNumPartition()!=1){
2400           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2401           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2402           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2403           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2404         }
2405       }
2406         else 
2407           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2408 #endif
2409       }
2410
2411       void pingBuddy()
2412       {
2413 #if CMK_MEM_CHECKPOINT
2414         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2415         if (obj) {
2416           int buddy = obj->BuddyPE(CkMyPe());
2417           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2418           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2419           CmiSetHandler(msg, pingHandlerIdx);
2420           CmiGetRestartPhase(msg) = 9999;
2421           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2422         }
2423         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2424 #endif
2425       }
2426 #endif
2427
2428       // initproc
2429       void CkRegisterRestartHandler( )
2430       {
2431 #if CMK_MEM_CHECKPOINT
2432         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2433         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2434         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2435         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2436         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2437         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2438
2439         //for replica
2440         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2441         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2442         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2443         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2444         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2445         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2446         replicaBeginFailureInjectionHandlerIdx = CkRegisterHandler((CmiHandler)replicaBeginFailureInjectionHandler);
2447         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2448         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2449         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2450         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2451
2452 #if CMK_CONVERSE_MPI
2453         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2454         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2455         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2456 #endif
2457
2458         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2459         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2460         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2461         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2462         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2463         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2464         CpvInitialize(int,curPointer);
2465         CpvInitialize(int,recvdLocal);
2466         CpvInitialize(int,localChkpDone);
2467         CpvInitialize(int,remoteChkpDone);
2468         CpvInitialize(int,remoteStarted);
2469         CpvInitialize(int,localStarted);
2470         CpvInitialize(int,localReady);
2471         CpvInitialize(int,remoteReady);
2472         CpvInitialize(int,recvdRemote);
2473         CpvInitialize(int,recvdProcChkp);
2474         CpvInitialize(int,localChecksum);
2475         CpvInitialize(int,remoteChecksum);
2476         CpvInitialize(int,recvdArrayChkp);
2477
2478         CpvAccess(procChkptBuf) = NULL;
2479         CpvAccess(buddyBuf) = NULL;
2480         CpvAccess(recoverProcBuf) = NULL;
2481         CpvAccess(recoverArrayBuf) = NULL;
2482         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2483         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2484         CpvAccess(chkpBuf)[0] = NULL;
2485         CpvAccess(chkpBuf)[1] = NULL;
2486         CpvAccess(localProcChkpBuf)[0] = NULL;
2487         CpvAccess(localProcChkpBuf)[1] = NULL;
2488
2489         CpvAccess(curPointer) = 0;
2490         CpvAccess(recvdLocal) = 0;
2491         CpvAccess(localChkpDone) = 0;
2492         CpvAccess(remoteChkpDone) = 0;
2493         CpvAccess(remoteStarted) = 0;
2494         CpvAccess(localStarted) = 0;
2495         CpvAccess(localReady) = 0;
2496         CpvAccess(remoteReady) = 0;
2497         CpvAccess(recvdRemote) = 0;
2498         CpvAccess(recvdProcChkp) = 0;
2499         CpvAccess(localChecksum) = 0;
2500         CpvAccess(remoteChecksum) = 0;
2501         CpvAccess(recvdArrayChkp) = 0;
2502
2503         notify_crash_fn = notify_crash;
2504
2505 #if ! CMK_CONVERSE_MPI
2506         // print pid to kill
2507         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2508         //  sleep(4);
2509 #endif
2510 #endif
2511       }
2512
2513
2514       extern "C"
2515         int CkHasCheckpoints()
2516         {
2517           return checkpointed;
2518         }
2519
2520       /// @todo: the following definitions should be moved to a separate file containing
2521       // structures and functions about fault tolerance strategies
2522
2523       /**
2524        *  * @brief: function for killing a process                                             
2525        *   */
2526 #ifdef CMK_MEM_CHECKPOINT
2527 #if CMK_HAS_GETPID
2528       void killLocal(void *_dummy,double curWallTime){
2529         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2530         if(CmiWallTimer()<killTime-1){
2531           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2532         }else{ 
2533 #if CMK_CONVERSE_MPI
2534           if(CkHasCheckpoints()&&!CkInCheckpointing()&&!CkInRestarting()){
2535             CkDieNow();
2536           }else{
2537             //next failure
2538             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
2539             checkptMgr.generateFailure();
2540             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->replicaInjectFailure();
2541           }
2542 #else 
2543           kill(getpid(),SIGKILL);                                               
2544 #endif
2545         }              
2546       } 
2547 #else
2548       void killLocal(void *_dummy,double curWallTime){
2549         CmiAbort("kill() not supported!");
2550       }
2551 #endif
2552 #endif
2553
2554 #ifdef CMK_MEM_CHECKPOINT
2555       /**
2556        * @brief: reads the file with the kill information
2557        */
2558       void readKillFile(){
2559         FILE *fp=fopen(killFile,"r");
2560         if(!fp){
2561           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2562           return;
2563         }
2564         int proc;
2565         double sec;
2566         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2567           if(proc == CkMyNode() && CkMyRank() == 0){
2568             killTime = CmiWallTimer()+sec;
2569             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2570             CcdCallFnAfter(killLocal,NULL,sec*1000);
2571           }
2572         }
2573         fclose(fp);
2574       }
2575
2576
2577 #if ! CMK_CONVERSE_MPI
2578       void CkDieNow()
2579       {
2580 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2581         // ignored for non-mpi version
2582         CmiPrintf("[%d] die now.\n", CmiMyPe());
2583         killTime = CmiWallTimer()+0.001;
2584         CcdCallFnAfter(killLocal,NULL,1);
2585 #endif
2586       }
2587 #endif
2588
2589 #endif
2590
2591 #include "CkMemCheckpoint.def.h"
2592
2593