inject soft and hard failures according to exponential and weibull distribution
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92 unsigned int failureSeed;
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // flag for failure distributiona
104 char * failureDist;
105 // Meantimr between failure
106 int MTBF;
107 int SMTBF;
108 //shape parameter for Weibull distribution
109 double beta;
110 double alpha;
111 double s_alpha;
112 // variable for storing the killing time
113 double killTime=0.0;
114 extern void killLocal(void *_dummy,double curWallTime);
115 extern void injectSoftFailure(void *_dummy,double curWallTime);
116 #endif
117
118 #ifdef CKLOCMGR_LOOP
119 #undef CKLOCMGR_LOOP
120 #endif
121 // loop over all CkLocMgr and do "code"
122 #define  CKLOCMGR_LOOP(code)    {       \
123   int numGroups = CkpvAccess(_groupIDTable)->size();    \
124   for(int i=0;i<numGroups;i++) {        \
125     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
126     if(obj->isLocMgr())  {      \
127       CkLocMgr *mgr = (CkLocMgr*)obj;   \
128       code      \
129     }   \
130   }     \
131 }
132
133 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
134 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
135 CpvDeclare(CkCheckPTMessage**, chkpBuf);
136 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
137 //store the checkpoint of the buddy to compare
138 //do not need the whole msg, can be the checksum
139 CpvDeclare(CkCheckPTMessage*, buddyBuf);
140 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
141 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
142 //pointer of the checkpoint going to be written
143 CpvDeclare(int, curPointer);
144 CpvDeclare(int, recvdRemote);
145 CpvDeclare(int, recvdLocal);
146 CpvDeclare(int, localChkpDone);
147 CpvDeclare(int, remoteChkpDone);
148 CpvDeclare(int, remoteStarted);
149 CpvDeclare(int, localStarted);
150 CpvDeclare(int, remoteReady);
151 CpvDeclare(int, localReady);
152 CpvDeclare(int, recvdArrayChkp);
153 CpvDeclare(int, recvdProcChkp);
154 CpvDeclare(int, localChecksum);
155 CpvDeclare(int, remoteChecksum);
156
157 bool compare(char * buf1, char * buf2);
158 int getChecksum(char * buf);
159 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
160 // Converse function handles
161 static int askPhaseHandlerIdx;
162 static int recvPhaseHandlerIdx;
163 static int askProcDataHandlerIdx;
164 static int askRecoverDataHandlerIdx;
165 static int restartBcastHandlerIdx;
166 static int recoverProcDataHandlerIdx;
167 static int restartBeginHandlerIdx;
168 static int recvRemoteChkpHandlerIdx;
169 static int replicaDieHandlerIdx;
170 static int replicaChkpStartHandlerIdx;
171 static int replicaDieBcastHandlerIdx;
172 static int replicaRecoverHandlerIdx;
173 static int replicaChkpDoneHandlerIdx;
174 static int replicaBeginFailureInjectionHandlerIdx;
175 static int recoverRemoteProcDataHandlerIdx;
176 static int recoverRemoteArrayDataHandlerIdx;
177 static int notifyHandlerIdx;
178 // compute the backup processor
179 // FIXME: avoid crashed processors
180 #if CMK_CONVERSE_MPI
181 static int pingHandlerIdx;
182 static int pingCheckHandlerIdx;
183 static int buddyDieHandlerIdx;
184 static double lastPingTime = -1;
185
186 extern "C" void mpi_restart_crashed(int pe, int rank);
187 extern "C" int  find_spare_mpirank(int pe,int partition);
188
189 void pingBuddy();
190 void pingCheckHandler();
191
192 #endif
193 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
194
195 inline int CkMemCheckPT::BuddyPE(int pe)
196 {
197   int budpe;
198 #if NODE_CHECKPOINT
199   // buddy is the processor with same rank on the next physical node
200   int r1 = CmiPhysicalRank(pe);
201   int budnode = CmiPhysicalNodeID(pe);
202   do {
203     budnode = (budnode+1)%CmiNumPhysicalNodes();
204     int *pelist;
205     int num;
206     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
207     budpe = pelist[r1 % num];
208   } while (isFailed(budpe));
209   if (budpe == pe) {
210     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
211     CmiAbort("Failed to find a buddy processor");
212   }
213 #else
214   budpe = pe;
215   budpe = (budpe+1)%CkNumPes();
216 #endif
217   return budpe;
218 }
219
220 // called in array element constructor
221 // choose and register with 2 buddies for checkpoiting 
222 #if CMK_MEM_CHECKPOINT
223 void ArrayElement::init_checkpt() {
224   if (_memChkptOn == 0) return;
225   if (CkInRestarting()) {
226     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
227   }
228   // only master init checkpoint
229   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
230
231   budPEs[0] = CkMyPe();
232   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
233   CmiAssert(budPEs[0] != budPEs[1]);
234   // inform checkPTMgr
235   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
236   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
237   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
238 }
239 #endif
240
241 // entry function invoked by checkpoint mgr asking for checkpoint data
242 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
243 #if CMK_MEM_CHECKPOINT
244   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
245   //char index[128];   thisIndexMax.sprint(index);
246   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
247   CkLocMgr *locMgr = thisArray->getLocMgr();
248   CmiAssert(myRec!=NULL);
249   int size;
250   {
251     PUP::sizer p;
252     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
253     size = p.size();
254   }
255   int packSize = size/sizeof(double) +1;
256   CkArrayCheckPTMessage *msg =
257     new (packSize, 0) CkArrayCheckPTMessage;
258   msg->len = size;
259   msg->index =thisIndexMax;
260   msg->aid = thisArrayID;
261   msg->locMgr = locMgr->getGroupID();
262   msg->cp_flag = 1;
263   {
264     PUP::toMem p(msg->packData);
265     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
266   }
267
268   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
269   checkptMgr.recvData(msg, 2, budPEs);
270   delete m;
271 #endif
272 }
273
274 // checkpoint holder class - for memory checkpointing
275 class CkMemCheckPTInfo: public CkCheckPTInfo
276 {
277   CkArrayCheckPTMessage *ckBuffer;
278   public:
279   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
280     CkCheckPTInfo(a, loc, idx, pno)
281   {
282     ckBuffer = NULL;
283   }
284   ~CkMemCheckPTInfo() 
285   {
286     if (ckBuffer) delete ckBuffer; 
287   }
288   inline void updateBuffer(CkArrayCheckPTMessage *data) 
289   {
290     CmiAssert(data!=NULL);
291     if (ckBuffer) delete ckBuffer;
292     ckBuffer = data;
293   }    
294   inline CkArrayCheckPTMessage * getCopy()
295   {
296     if (ckBuffer == NULL) {
297       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
298       CmiAbort("Abort!");
299     }
300     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
301   }     
302   inline void updateBuddy(int b1, int b2) {
303     CmiAssert(ckBuffer);
304     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
305     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
306     CmiAssert(pNo != CkMyPe());
307   }
308   inline int getSize() { 
309     CmiAssert(ckBuffer);
310     return ckBuffer->len; 
311   }
312 };
313
314 // checkpoint holder class - for in-disk checkpointing
315 class CkDiskCheckPTInfo: public CkCheckPTInfo 
316 {
317   char *fname;
318   int bud1, bud2;
319   int len;                      // checkpoint size
320   public:
321   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
322   {
323 #if CMK_USE_MKSTEMP
324     fname = new char[64];
325     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
326     mkstemp(fname);
327 #else
328     fname=tmpnam(NULL);
329 #endif
330     bud1 = bud2 = -1;
331     len = 0;
332   }
333   ~CkDiskCheckPTInfo() 
334   {
335     remove(fname);
336   }
337   inline void updateBuffer(CkArrayCheckPTMessage *data) 
338   {
339     double t = CmiWallTimer();
340     // unpack it
341     envelope *env = UsrToEnv(data);
342     CkUnpackMessage(&env);
343     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
344     FILE *f = fopen(fname,"wb");
345     PUP::toDisk p(f);
346     CkPupMessage(p, (void **)&data);
347     // delay sync to the end because otherwise the messages are blocked
348     //    fsync(fileno(f));
349     fclose(f);
350     bud1 = data->bud1;
351     bud2 = data->bud2;
352     len = data->len;
353     delete data;
354     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
355   }
356   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
357   {
358     CkArrayCheckPTMessage *data;
359     FILE *f = fopen(fname,"rb");
360     PUP::fromDisk p(f);
361     CkPupMessage(p, (void **)&data);
362     fclose(f);
363     data->bud1 = bud1;                          // update the buddies
364     data->bud2 = bud2;
365     return data;
366   }
367   inline void updateBuddy(int b1, int b2) {
368     bud1 = b1; bud2 = b2;
369     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
370     CmiAssert(pNo != CkMyPe());
371   }
372   inline int getSize() { 
373     return len; 
374   }
375 };
376
377 CkMemCheckPT::CkMemCheckPT(int w)
378 {
379   int numnodes = 0;
380 #if NODE_CHECKPOINT
381   numnodes = CmiNumPhysicalNodes();
382 #else
383   numnodes = CkNumPes();
384 #endif
385 #if CK_NO_PROC_POOL
386   if (numnodes <= 2)
387 #else
388     if (numnodes  == 1)
389 #endif
390     {
391       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
392       _memChkptOn = 0;
393     }
394   inRestarting = 0;
395   inCheckpointing = 0;
396   recvCount = peCount = 0;
397   ackCount = 0;
398   expectCount = -1;
399   where = w;
400   replicaAlive = 1;
401   notifyReplica = 0;
402 #if CMK_CONVERSE_MPI
403   void pingBuddy();
404   void pingCheckHandler();
405   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
406   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
407 #endif
408   chkpTable[0] = NULL;
409   chkpTable[1] = NULL;
410   maxIter = -1;
411   recvIterCount = 0;
412   localDecided = false;
413   softFailureInjected = false;
414   if(killFlag == 2){
415     localSeed = failureSeed;
416     thisProxy[CkMyPe()].generateFailure();
417     
418     if(SMTBF!=-1 && CmiMyPartition()==1 && CkMyPe()==0){
419       softLocalSeed = failureSeed*2;
420       thisProxy[CkMyPe()].generateSoftFailure();
421     }
422
423   }
424 }
425
426 void CkMemCheckPT::replicaInjectFailure(){
427   char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
428   CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
429   CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
430 }
431
432 void CkMemCheckPT::generateFailure(){
433   int rand1 = rand_r(&localSeed);
434   int rand2 = rand_r(&localSeed);
435   int rand3 = rand_r(&localSeed);
436   int next_pe = (rand1)%CkNumPes();
437   if(next_pe == 0){
438     next_pe = 1;
439   }
440   int next_partition = (rand2)%2;
441   double sec;
442   if(strcmp(failureDist,"E")==0)
443     sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
444   else if(strcmp(failureDist,"W")==0)
445     sec = alpha*pow(-log(1.0f - ((double)rand3)/(long long int)(RAND_MAX)),1/beta);
446   if(next_pe == CmiMyPe()&& next_partition == CmiMyPartition()){
447     killTime = CmiWallTimer()+sec;
448     printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
449     CcdCallFnAfter(killLocal,NULL,sec*1000);
450   }
451 }
452
453 void CkMemCheckPT::generateSoftFailure(){
454   int rand = rand_r(&softLocalSeed);
455   double sec;
456   if(strcmp(failureDist,"E")==0){
457     sec = -log(1.0f - ((double)rand)/(long long int)(RAND_MAX))*SMTBF;
458   }
459   else{
460     sec = s_alpha*pow(-log(1.0f - ((double)rand)/(long long int)(RAND_MAX)),1/beta);
461   }
462   printf("[%d][%d] inject soft failure after %.6lf s (MEMCKPT)\n",CmiMyPartition(),CkMyPe(),sec);
463   CcdCallFnAfter(injectSoftFailure,NULL,sec*1000);
464 }
465
466 CkMemCheckPT::~CkMemCheckPT()
467 {
468   int len = ckTable.length();
469   for (int i=0; i<len; i++) {
470     delete ckTable[i];
471   }
472 }
473
474 void CkMemCheckPT::pup(PUP::er& p) 
475
476   CBase_CkMemCheckPT::pup(p); 
477   p|cpStarter;
478   p|thisFailedPe;
479   p|failedPes;
480   p|ckCheckPTGroupID;           // recover global variable
481   p|cpCallback;                 // store callback
482   p|where;                      // where to checkpoint
483   p|peCount;
484   p|localSeed;
485   p|softLocalSeed;
486   if (p.isUnpacking()) {
487     recvCount = 0;
488 #if CMK_CONVERSE_MPI
489     void pingBuddy();
490     void pingCheckHandler();
491     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
492     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
493 #endif
494     maxIter = -1;
495     recvIterCount = 0;
496     localDecided = false;
497   }
498 }
499
500 void CkMemCheckPT::getIter(){
501   localDecided = true;
502   localMaxIter = maxIter+1;
503   if(CkMyPe()==0){
504     CkPrintf("local max iter is %d\n",localMaxIter);
505   }
506   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
507   int elemCount = CkCountChkpSyncElements();
508   if(CkMyPe()==0)
509     startTime = CmiWallTimer();
510   if(elemCount == 0){
511     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
512   }
513 }
514
515 //when one replica failes, decide the next chkp iter
516
517 void CkMemCheckPT::recvIter(int iter){
518   if(maxIter<iter){
519     maxIter=iter;
520   }
521 }
522
523 void CkMemCheckPT::recvMaxIter(int iter){
524   localDecided = false;
525   if(CkMyPe()==0)
526     CkPrintf("checkpoint iteration is %d\n",iter);
527   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
528 }
529
530 void CkMemCheckPT::reachChkpIter(){
531   recvIterCount++;
532   elemCount = CkCountChkpSyncElements();
533   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
534   if(recvIterCount == elemCount){
535     recvIterCount = 0;
536     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
537   }
538 }
539
540 void CkMemCheckPT::startChkp(){
541   if(CkInCheckpointing()){
542     return;
543   }
544   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
545   CkStartMemCheckpoint(cpCallback);
546 }
547
548 // called by checkpoint mgr to restore an array element
549 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
550 {
551 #if CMK_MEM_CHECKPOINT
552   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
553   // m->index.print();
554   PUP::fromMem p(m->packData);
555   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
556   CmiAssert(mgr);
557 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
558   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
559 #else
560   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
561 #endif
562
563   // find a list of array elements bound together
564   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
565   CmiAssert(elt);
566   CkLocRec_local *rec = elt->myRec;
567   CkVec<CkMigratable *> list;
568   mgr->migratableList(rec, list);
569   CmiAssert(list.length() > 0);
570   for (int l=0; l<list.length(); l++) {
571     elt = (ArrayElement *)list[l];
572     elt->budPEs[0] = m->bud1;
573     elt->budPEs[1] = m->bud2;
574     //    reset, may not needed now
575     // for now.
576     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
577       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
578       if (c) c->redNo = 0;
579     }
580   }
581 #endif
582 }
583
584 // return 1 if pe is a crashed processor
585 int CkMemCheckPT::isFailed(int pe)
586 {
587   for (int i=0; i<failedPes.length(); i++)
588     if (failedPes[i] == pe) return 1;
589   return 0;
590 }
591
592 // add pe into history list of all failed processors
593 void CkMemCheckPT::failed(int pe)
594 {
595   if (isFailed(pe)) return;
596   failedPes.push_back(pe);
597 }
598
599 int CkMemCheckPT::totalFailed()
600 {
601   return failedPes.length();
602 }
603
604 // create an checkpoint entry for array element of aid with index.
605 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
606 {
607   // error check, no duplicate
608   int idx, len = ckTable.size();
609   for (idx=0; idx<len; idx++) {
610     CkCheckPTInfo *entry = ckTable[idx];
611     if (index == entry->index) {
612       if (loc == entry->locMgr) {
613         // bindTo array elements
614         return;
615       }
616       // for array inheritance, the following check may fail
617       // because ArrayElement constructor of all superclasses are called
618       if (aid == entry->aid) {
619         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
620         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
621       }
622     }
623   }
624   CkCheckPTInfo *newEntry;
625   if (where == CkCheckPoint_inMEM)
626     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
627   else
628     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
629   ckTable.push_back(newEntry);
630   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
631 }
632
633 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
634 {
635 #if !CMK_CHKP_ALL       
636   int buddy = msg->bud1;
637   if (buddy == CkMyPe()) buddy = msg->bud2;
638   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
639   recvData(msg);
640   // ack
641   thisProxy[buddy].gotData();
642 #else
643   chkpTable[0] = NULL;
644   chkpTable[1] = NULL;
645   recvArrayCheckpoint(msg);
646   thisProxy[msg->bud2].gotData();
647 #endif
648 }
649
650 void CkMemCheckPT::chkpLocalStart()
651 {
652   CpvAccess(localStarted) = 1;
653 }
654
655 // loop through my checkpoint table and ask checkpointed array elements
656 // to send me checkpoint data.
657 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
658 void CkMemCheckPT::doItNow(int starter)
659 {
660   checkpointed = 1;
661   //cpCallback = cb;
662   cpStarter = starter;
663   inCheckpointing = 1;
664   if (CkMyPe() == cpStarter) {
665     startTime = CmiWallTimer();
666     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
667   }
668 #if CMK_CONVERSE_MPI
669   if(CmiNumPartition()==1)
670 #endif    
671   {
672 #if !CMK_CHKP_ALL
673     int len = ckTable.length();
674     for (int i=0; i<len; i++) {
675       CkCheckPTInfo *entry = ckTable[i];
676       // always let the bigger number processor send request
677       //if (CkMyPe() < entry->pNo) continue;
678       // always let the smaller number processor send request, may on same proc
679       if (!isMaster(entry->pNo)) continue;
680       // call inmem_checkpoint to the array element, ask it to send
681       // back checkpoint data via recvData().
682       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
683       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
684     }
685     // if my table is empty, then I am done
686     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
687 #else
688     startArrayCheckpoint();
689 #endif
690     // pack and send proc level data
691     sendProcData();
692   }
693 #if CMK_CONVERSE_MPI
694   else
695   {
696     startCheckpoint();
697   }
698 #endif
699 }
700
701 class MemElementPacker : public CkLocIterator{
702   private:
703     //    CkLocMgr *locMgr;
704     PUP::er &p;
705     std::map<CkHashCode,CkLocation> arrayMap;
706   public:
707     MemElementPacker(PUP::er &p_):p(p_){};
708     void addLocation(CkLocation &loc){
709       CkArrayIndexMax idx = loc.getIndex();
710       arrayMap[idx.hash()] = loc;
711     }
712     void writeCheckpoint(){
713       std::map<CkHashCode, CkLocation>::iterator it;
714       for(it = arrayMap.begin();it!=arrayMap.end();it++){
715         CkLocation loc = it->second;
716         CkLocMgr *locMgr = loc.getManager();
717         CkArrayIndexMax idx = loc.getIndex();
718         CkGroupID gID = locMgr->ckGetGroupID();
719         p|gID;
720         p|idx;
721         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
722       }
723     }
724 };
725
726 void pupAllElements(PUP::er &p){
727 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
728   int numElements;
729   if(!p.isUnpacking()){
730     numElements = CkCountArrayElements();
731   }
732   p | numElements;
733   if(!p.isUnpacking()){
734     MemElementPacker packer(p);
735     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
736     packer.writeCheckpoint();
737   }
738 #endif
739 }
740
741 void CkMemCheckPT::startArrayCheckpoint(){
742 #if CMK_CHKP_ALL
743   int size;
744   {
745     PUP::sizer psizer;
746     pupAllElements(psizer);
747     size = psizer.size();
748   }
749   int packSize = size/sizeof(double)+1;
750   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
751   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
752   msg->len = size;
753   msg->cp_flag = 1;
754   int budPEs[2];
755   msg->bud1=CkMyPe();
756   msg->bud2=ChkptOnPe(CkMyPe());
757   {
758     PUP::toMem p(msg->packData);
759     pupAllElements(p);
760   }
761   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
762   if(chkpTable[0]) delete chkpTable[0];
763   chkpTable[0] = msg;
764   //send the checkpoint to my 
765   recvCount++;
766 #endif
767 }
768
769 void CkMemCheckPT::startCheckpoint(){
770 #if CMK_CONVERSE_MPI
771   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
772     CkPrintf("in start checkpointing!!!!\n");
773   int size;
774   {
775     PUP::sizer p;
776     _handleProcData(p,CmiFalse);
777     size = p.size();
778   }
779   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
780   procMsg->len = size;
781   procMsg->cp_flag = 1;
782   {
783     PUP::toMem p(procMsg->packData);
784     _handleProcData(p,CmiFalse);
785   }
786   int pointer = CpvAccess(curPointer);
787   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
788   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
789
790   {
791     PUP::sizer psizer;
792     pupAllElements(psizer);
793     size = psizer.size();
794   }
795   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
796   msg->len = size;
797   msg->cp_flag = 1;
798   int checksum;
799   {
800     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
801       PUP::checker p(msg->packData);
802       pupAllElements(p);
803       checksum = p.getChecksum();
804     }else{  
805 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
806       PUP::toMem p(msg->packData);
807       pupAllElements(p);
808     }
809   }
810   pointer = CpvAccess(curPointer);
811   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
812   CpvAccess(chkpBuf)[pointer] = msg;
813   
814   if(CkMyPe()==0)
815     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
816   
817   if(CkReplicaAlive()==1){
818     CpvAccess(recvdLocal) = 1;
819     if(CpvAccess(use_checksum)){
820       CpvAccess(localChecksum) = checksum;
821       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
822       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
823       //only one reaplica will send
824       if(CmiMyPartition()==0){
825         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
826         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
827       }
828     }else{
829       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
830       CkPackMessage(&env);
831       if(CmiMyPartition()==0){
832         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
833         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
834       }
835     }
836     if(CmiMyPartition()==0){
837       notifyReplica = 1;
838       thisProxy[CkMyPe()].doneComparison(true);
839     }
840   }
841   if(CpvAccess(recvdRemote)==1){
842     //only partition 1 will do it
843     //compare the checkpoint 
844     int size = CpvAccess(chkpBuf)[pointer]->len;
845     if(CpvAccess(use_checksum)){
846       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
847         thisProxy[CkMyPe()].doneComparison(true);
848       }
849       else{
850         thisProxy[CkMyPe()].doneComparison(true);
851         //thisProxy[CkMyPe()].doneComparison(false);
852       }
853     }else{
854       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
855         thisProxy[CkMyPe()].doneComparison(true);
856       }
857       else{
858         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
859         thisProxy[CkMyPe()].doneComparison(true);
860         //thisProxy[CkMyPe()].doneComparison(false);
861       }
862     }
863     if(CkMyPe()==0)
864       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
865   }
866   else{
867     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
868       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
869       if(CpvAccess(remoteReady)==1){
870         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
871         {       
872           int pointer = CpvAccess(curPointer);
873           //send the proc data
874           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
875           procMsg->pointer = pointer;
876           envelope * env = (envelope *)(UsrToEnv(procMsg));
877           CkPackMessage(&env);
878           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
879           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
880           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
881             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
882         }
883         //send the array checkpoint data
884         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
885         msg->pointer = CpvAccess(curPointer);
886         envelope * env = (envelope *)(UsrToEnv(msg));
887         CkPackMessage(&env);
888         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
889         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
890         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
891           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
892       }else{
893         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
894           int pointer = CpvAccess(curPointer);
895           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
896           (CpvAccess(recoverProcBuf))->pointer = pointer;
897         }
898         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
899         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
900         CpvAccess(localReady) = 1;
901       }
902       //can continue work, no need to wait for my replica
903       int _ret = 1;
904       notifyReplica = 1;
905       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
906       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
907     }
908   }
909 #endif
910 }
911
912 void CkMemCheckPT::doneComparison(bool ret){
913   int _ret = 1;
914   if(!ret){
915     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
916     _ret = 0;
917   }else{
918     _ret = 1;
919   }
920   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
921   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
922 }
923
924 void CkMemCheckPT::doneRComparison(int ret){
925   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
926   if(ret==CkNumPes()&&!softFailureInjected){
927     CpvAccess(localChkpDone) = 1;
928     if(CpvAccess(remoteChkpDone) ==1){
929       thisProxy.doneBothComparison();
930     }else{
931       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
932     }
933   }
934   else{
935     ret = 0;
936     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
937     startTime = CmiWallTimer();
938     thisProxy.RollBack();
939   }
940   if(notifyReplica == 0){
941     //notify the replica am done
942     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
943     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
944     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
945     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
946     notifyReplica = 1;
947   }
948 }
949
950 void CkMemCheckPT::doneBothComparison(){
951   CpvAccess(recvdRemote) = 0;
952   CpvAccess(remoteReady) = 0;
953   CpvAccess(localReady) = 0;
954   CpvAccess(recvdLocal) = 0;
955   CpvAccess(localChkpDone) = 0;
956   CpvAccess(remoteChkpDone) = 0;
957   CpvAccess(remoteStarted) = 0;
958   CpvAccess(localStarted) = 0;
959   CpvAccess(_remoteCrashedNode) = -1;
960   CkMemCheckPT::replicaAlive = 1;
961   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
962   CpvAccess(curPointer)^=1;
963   inCheckpointing = 0;
964   notifyReplica = 0;
965   if(CkMyPe() == 0){
966     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
967   }
968   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
969 }
970
971 void CkMemCheckPT::RollBack(){
972   //restore group data
973   checkpointed = 0;
974   inRestarting = 1;
975   int pointer = CpvAccess(curPointer)^1;//use the previous one
976   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
977   PUP::fromMem p(chkpMsg->packData);    
978
979   //destroy array elements
980   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
981   int numGroups = CkpvAccess(_groupIDTable)->size();
982   for(int i=0;i<numGroups;i++) {
983     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
984     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
985     obj->flushStates();
986     obj->ckJustMigrated();
987   }
988   //restore array elements
989
990   int numElements;
991   p|numElements;
992
993   if(p.isUnpacking()){
994     for(int i=0;i<numElements;i++){
995       CkGroupID gID;
996       CkArrayIndex idx;
997       p|gID;
998       p|idx;
999       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1000       CmiAssert(mgr);
1001       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1002     }
1003   }
1004   
1005   softFailureInjected = false;
1006   CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
1007   contribute(cb);
1008 }
1009
1010   void CkMemCheckPT::notifyReplicaDie(int pe){
1011     replicaAlive = 0;
1012     CpvAccess(_remoteCrashedNode) = pe;
1013   }
1014
1015   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
1016   {
1017 #if CMK_CHKP_ALL
1018     int idx = 1;
1019     if(msg->bud1 == CkMyPe()){
1020       idx = 0;
1021     }
1022     int isChkpting = msg->cp_flag;
1023     if(isChkpting == 1){
1024       if(chkpTable[idx]) delete chkpTable[idx];
1025     }
1026     chkpTable[idx] = msg;
1027     if(isChkpting){
1028       recvCount++;
1029       if(recvCount == 2){
1030         if (where == CkCheckPoint_inMEM) {
1031           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1032         }
1033         else if (where == CkCheckPoint_inDISK) {
1034           // another barrier for finalize the writing using fsync
1035           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1036           contribute(0,NULL,CkReduction::sum_int,localcb);
1037         }
1038         else
1039           CmiAbort("Unknown checkpoint scheme");
1040         recvCount = 0;
1041       }
1042     }
1043 #endif
1044   }
1045
1046   // don't handle array elements
1047   static inline void _handleProcData(PUP::er &p, CmiBool create)
1048   {
1049     // save readonlys, and callback BTW
1050     CkPupROData(p);
1051
1052     // save mainchares 
1053     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
1054
1055 #ifndef CMK_CHARE_USE_PTR
1056     // save non-migratable chare
1057     CkPupChareData(p);
1058 #endif
1059
1060     // save groups into Groups.dat
1061     CkPupGroupData(p,create);
1062
1063     // save nodegroups into NodeGroups.dat
1064     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
1065   }
1066
1067   void CkMemCheckPT::sendProcData()
1068   {
1069     // find out size of buffer
1070     int size;
1071     {
1072       PUP::sizer p;
1073       _handleProcData(p,CmiTrue);
1074       size = p.size();
1075     }
1076     int packSize = size;
1077     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1078     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1079     {
1080       PUP::toMem p(msg->packData);
1081       _handleProcData(p,CmiTrue);
1082     }
1083     msg->pe = CkMyPe();
1084     msg->len = size;
1085     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1086     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1087   }
1088
1089   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1090   {
1091     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1092     CpvAccess(procChkptBuf) = msg;
1093     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1094     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1095   }
1096
1097   // ArrayElement call this function to give us the checkpointed data
1098   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1099   {
1100     int len = ckTable.length();
1101     int idx;
1102     for (idx=0; idx<len; idx++) {
1103       CkCheckPTInfo *entry = ckTable[idx];
1104       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1105     }
1106     CkAssert(idx < len);
1107     int isChkpting = msg->cp_flag;
1108     ckTable[idx]->updateBuffer(msg);
1109     if (isChkpting) {
1110       // all my array elements have returned their inmem data
1111       // inform starter processor that I am done.
1112       recvCount ++;
1113       if (recvCount == ckTable.length()) {
1114         if (where == CkCheckPoint_inMEM) {
1115           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1116         }
1117         else if (where == CkCheckPoint_inDISK) {
1118           // another barrier for finalize the writing using fsync
1119           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1120           contribute(0,NULL,CkReduction::sum_int,localcb);
1121         }
1122         else
1123           CmiAbort("Unknown checkpoint scheme");
1124         recvCount = 0;
1125       } 
1126     }
1127   }
1128
1129   // only used in disk checkpointing
1130   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1131   {
1132     delete m;
1133 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1134     system("sync");
1135 #endif
1136     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1137   }
1138
1139   // only is called on cpStarter when checkpoint is done
1140   void CkMemCheckPT::cpFinish()
1141   {
1142     CmiAssert(CkMyPe() == cpStarter);
1143     peCount++;
1144     // now that all processors have finished, activate callback
1145     if (peCount == 2) 
1146     {
1147       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1148       peCount = 0;
1149       thisProxy.report();
1150     }
1151   }
1152
1153   // for debugging, report checkpoint info
1154   void CkMemCheckPT::report()
1155   {
1156     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1157     inCheckpointing = 0;
1158 #if !CMK_CHKP_ALL
1159     int objsize = 0;
1160     int len = ckTable.length();
1161     for (int i=0; i<len; i++) {
1162       CkCheckPTInfo *entry = ckTable[i];
1163       CmiAssert(entry);
1164       objsize += entry->getSize();
1165     }
1166     CmiAssert(CpvAccess(procChkptBuf));
1167     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1168 #else
1169     if(CkMyPe()==0)
1170       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1171 #endif
1172   }
1173
1174   /*****************************************************************************
1175     RESTART Procedure
1176    *****************************************************************************/
1177
1178   // master processor of two buddies
1179   inline int CkMemCheckPT::isMaster(int buddype)
1180   {
1181 #if 0
1182     int mype = CkMyPe();
1183     //CkPrintf("ismaster: %d %d\n", pe, mype);
1184     if (CkNumPes() - totalFailed() == 2) {
1185       return mype > buddype;
1186     }
1187     for (int i=1; i<CkNumPes(); i++) {
1188       int me = (buddype+i)%CkNumPes();
1189       if (isFailed(me)) continue;
1190       if (me == mype) return 1;
1191       else return 0;
1192     }
1193     return 0;
1194 #else
1195     // smaller one
1196     int mype = CkMyPe();
1197     //CkPrintf("ismaster: %d %d\n", pe, mype);
1198     if (CkNumPes() - totalFailed() == 2) {
1199       return mype < buddype;
1200     }
1201 #if NODE_CHECKPOINT
1202     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1203     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1204 #else
1205       for (int i=1; i<CkNumPes(); i++) {
1206 #endif
1207         int me = (mype+i)%CkNumPes();
1208         if (isFailed(me)) continue;
1209         if (me == buddype) return 1;
1210         else return 0;
1211       }
1212       return 0;
1213 #endif
1214     }
1215
1216
1217
1218 #if 0
1219     // helper class to pup all elements that belong to same ckLocMgr
1220     class ElementDestoryer : public CkLocIterator {
1221       private:
1222         CkLocMgr *locMgr;
1223       public:
1224         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1225         void addLocation(CkLocation &loc) {
1226           CkArrayIndex idx=loc.getIndex();
1227           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1228           loc.destroy();
1229         }
1230     };
1231 #endif
1232
1233     // restore the bitmap vector for LB
1234     void CkMemCheckPT::resetLB(int diepe)
1235     {
1236 #if CMK_LBDB_ON
1237       int i;
1238       char *bitmap = new char[CkNumPes()];
1239       // set processor available bitmap
1240       get_avail_vector(bitmap);
1241       for (i=0; i<failedPes.length(); i++)
1242         bitmap[failedPes[i]] = 0; 
1243       bitmap[diepe] = 0;
1244
1245 #if CK_NO_PROC_POOL
1246       set_avail_vector(bitmap);
1247 #endif
1248
1249       // if I am the crashed pe, rebuild my failedPEs array
1250       if (CkMyNode() == diepe)
1251         for (i=0; i<CkNumPes(); i++) 
1252           if (bitmap[i]==0) failed(i);
1253
1254       delete [] bitmap;
1255 #endif
1256     }
1257
1258     static double restartT;
1259
1260     // in case when failedPe dies, everybody go through its checkpoint table:
1261     // destory all array elements
1262     // recover lost buddies
1263     // reconstruct all array elements from check point data
1264     // called on all processors
1265     void CkMemCheckPT::restart(int diePe)
1266     {
1267 #if CMK_MEM_CHECKPOINT
1268       thisFailedPe = diePe;
1269       double curTime = CmiWallTimer();
1270       if (CkMyPe() == thisFailedPe){
1271         restartT = CmiWallTimer();
1272         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1273       }
1274       stage = (char*)"resetLB";
1275       startTime = curTime;
1276       if (CkMyPe() == thisFailedPe)
1277         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1278
1279 //#if CK_NO_PROC_POOL
1280 //      failed(diePe);  // add into the list of failed pes
1281 //#endif
1282
1283 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1284
1285       inRestarting = 1;
1286
1287       // disable load balancer's barrier
1288       //if (CkMyPe() != diePe) resetLB(diePe);
1289
1290       CKLOCMGR_LOOP(mgr->startInserting(););
1291
1292
1293       if(CmiNumPartition()==1){
1294         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1295       }else{
1296         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1297         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1298       }
1299 #endif
1300     }
1301
1302     // loally remove all array elements
1303     void CkMemCheckPT::removeArrayElements()
1304     {
1305 #if CMK_MEM_CHECKPOINT
1306       int len = ckTable.length();
1307       double curTime = CmiWallTimer();
1308       if (CkMyPe() == thisFailedPe) 
1309         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1310       stage = (char*)"removeArrayElements";
1311       startTime = curTime;
1312
1313       //  if (cpCallback.isInvalid()) 
1314       //          CkPrintf("invalid pe %d\n",CkMyPe());
1315       //          CkAbort("Didn't set restart callback\n");;
1316       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1317
1318       // get rid of all buffering and remote recs
1319       // including destorying all array elements
1320 #if CK_NO_PROC_POOL  
1321       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1322 #else
1323       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1324 #endif
1325       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1326 #endif
1327     }
1328
1329     // flush state in reduction manager
1330     void CkMemCheckPT::resetReductionMgr()
1331     {
1332       if (CkMyPe() == thisFailedPe) 
1333         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1334       stage = (char *)"resetReductionMgr";
1335       int numGroups = CkpvAccess(_groupIDTable)->size();
1336       for(int i=0;i<numGroups;i++) {
1337         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1338         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1339         obj->flushStates();
1340         obj->ckJustMigrated();
1341       }
1342
1343       //reset CmiReduce
1344       CmiResetReductions();
1345
1346       // reset again
1347       //CpvAccess(_qd)->flushStates();
1348       if(CmiNumPartition()==1){
1349         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1350       }
1351       else{
1352         if (CkMyPe() == thisFailedPe) 
1353           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1354         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1355       }
1356     }
1357
1358     // recover the lost buddies
1359     void CkMemCheckPT::recoverBuddies()
1360     {
1361       int idx;
1362       int len = ckTable.length();
1363       // ready to flush reduction manager
1364       // cannot be CkMemCheckPT::restart because destory will modify states
1365       double curTime = CmiWallTimer();
1366       if (CkMyPe() == thisFailedPe)
1367         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1368       stage = (char *)"recoverBuddies";
1369       if (CkMyPe() == thisFailedPe)
1370         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1371       startTime = curTime;
1372
1373       // recover buddies
1374       expectCount = 0;
1375 #if !CMK_CHKP_ALL
1376       for (idx=0; idx<len; idx++) {
1377         CkCheckPTInfo *entry = ckTable[idx];
1378         if (entry->pNo == thisFailedPe) {
1379 #if CK_NO_PROC_POOL
1380           // find a new buddy
1381           int budPe = BuddyPE(CkMyPe());
1382 #else
1383           int budPe = thisFailedPe;
1384 #endif
1385           CkArrayCheckPTMessage *msg = entry->getCopy();
1386           msg->bud1 = budPe;
1387           msg->bud2 = CkMyPe();
1388           msg->cp_flag = 0;            // not checkpointing
1389           thisProxy[budPe].recoverEntry(msg);
1390           expectCount ++;
1391         }
1392       }
1393 #else
1394       //send to failed pe
1395       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1396 #if CK_NO_PROC_POOL
1397         // find a new buddy
1398         int budPe = BuddyPE(CkMyPe());
1399 #else
1400         int budPe = thisFailedPe;
1401 #endif
1402         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1403         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1404         msg->cp_flag = 0;            // not checkpointing
1405         msg->bud1 = budPe;
1406         msg->bud2 = CkMyPe();
1407         thisProxy[budPe].recoverEntry(msg);
1408         expectCount ++;
1409       }
1410 #endif
1411
1412       if (expectCount == 0) {
1413         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1414       }
1415       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1416     }
1417
1418     void CkMemCheckPT::gotData()
1419     {
1420       ackCount ++;
1421       if (ackCount == expectCount) {
1422         ackCount = 0;
1423         expectCount = -1;
1424         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1425         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1426       }
1427     }
1428
1429     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1430     {
1431
1432       for (int i=0; i<n; i++) {
1433         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1434         mgr->updateLocation(idx[i], nowOnPe);
1435       }
1436       thisProxy[nowOnPe].gotReply();
1437     }
1438
1439     // restore array elements
1440     void CkMemCheckPT::recoverArrayElements()
1441     {
1442       double curTime = CmiWallTimer();
1443       int len = ckTable.length();
1444       if (CkMyPe() == thisFailedPe)
1445         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1446       stage = (char *)"recoverArrayElements";
1447       startTime = curTime;
1448       int flag = 0;
1449       // recover all array elements
1450       int count = 0;
1451
1452 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1453       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1454       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1455 #endif
1456
1457 #if !CMK_CHKP_ALL
1458       for (int idx=0; idx<len; idx++)
1459       {
1460         CkCheckPTInfo *entry = ckTable[idx];
1461 #if CK_NO_PROC_POOL
1462         // the bigger one will do 
1463         //    if (CkMyPe() < entry->pNo) continue;
1464         if (!isMaster(entry->pNo)) continue;
1465 #else
1466         // smaller one do it, which has the original object
1467         if (CkMyPe() == entry->pNo+1 || 
1468             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1469 #endif
1470         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1471
1472         entry->updateBuddy(CkMyPe(), entry->pNo);
1473         CkArrayCheckPTMessage *msg = entry->getCopy();
1474         // gzheng
1475         //thisProxy[CkMyPe()].inmem_restore(msg);
1476         inmem_restore(msg);
1477 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1478         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1479         int homePe = mgr->homePe(msg->index);
1480         if (homePe != CkMyPe()) {
1481           gmap[homePe].push_back(msg->locMgr);
1482           imap[homePe].push_back(msg->index);
1483         }
1484 #endif
1485         CkFreeMsg(msg);
1486         count ++;
1487       }
1488 #else
1489       char * packData;
1490       if(CmiNumPartition()==1){
1491         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1492         packData = (char *)msg->packData;
1493       }
1494       else{
1495         int pointer = CpvAccess(curPointer);
1496         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1497         packData = msg->packData;
1498       }
1499 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1500       recoverAll(packData,gmap,imap);
1501 #else
1502       recoverAll(packData);
1503 #endif
1504 #endif
1505 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1506       for (int i=0; i<CkNumPes(); i++) {
1507         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1508           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1509           flag++;       
1510         }
1511       }
1512       delete [] imap;
1513       delete [] gmap;
1514 #endif
1515       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1516       //if (CkMyPe() == thisFailedPe)
1517
1518       CKLOCMGR_LOOP(mgr->doneInserting(););
1519
1520       // _crashedNode = -1;
1521       CpvAccess(_crashedNode) = -1;
1522 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1523       if (CkMyPe() == 0)
1524         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1525 #else
1526       if(flag == 0)
1527       {
1528         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1529       }
1530 #endif
1531     }
1532
1533     void CkMemCheckPT::gotReply(){
1534       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1535     }
1536
1537     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1538 #if CMK_CHKP_ALL
1539       PUP::fromMem p(packData);
1540       int numElements;
1541       p|numElements;
1542       if(p.isUnpacking()){
1543         for(int i=0;i<numElements;i++){
1544           CkGroupID gID;
1545           CkArrayIndex idx;
1546           p|gID;
1547           p|idx;
1548           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1549           int homePe = mgr->homePe(idx);
1550 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1551           mgr->resume(idx,p,CmiTrue,CmiTrue);
1552 #else
1553           if(CmiNumPartition()==1)
1554             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1555           else{
1556             if(CkMyPe()==thisFailedPe){
1557               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1558             }
1559             else{
1560               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1561             }
1562           }
1563 #endif
1564 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1565           homePe = mgr->homePe(idx);
1566           if (homePe != CkMyPe()) {
1567             gmap[homePe].push_back(gID);
1568             imap[homePe].push_back(idx);
1569           }
1570 #endif
1571         }
1572       }
1573 #endif
1574     }
1575
1576
1577     // on every processor
1578     // turn load balancer back on
1579     void CkMemCheckPT::finishUp()
1580     {
1581       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1582       //CKLOCMGR_LOOP(mgr->doneInserting(););
1583
1584       if (CkMyPe() == thisFailedPe)
1585       {
1586         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1587         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1588         fflush(stdout);
1589       }
1590
1591
1592       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1593       inRestarting = 0;
1594       maxIter = -1;
1595 #if CMK_CONVERSE_MPI    
1596       if(CmiNumPartition()!=1){
1597         CpvAccess(recvdProcChkp) = 0;
1598         CpvAccess(recvdArrayChkp) = 0;
1599         CpvAccess(curPointer)^=1;
1600         //notify my replica, restart is done
1601         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1602           CpvAccess(remoteStarted) =0;
1603           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1604           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1605           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1606         }
1607       }
1608       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1609         lastPingTime = CmiWallTimer();
1610         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1611       }
1612       //inject next failure
1613       if(killFlag==2){
1614         if(CkMyPe()==0){
1615           replicaInjectFailure();
1616         }
1617         thisProxy[CkMyPe()].generateFailure();
1618       }
1619 #endif
1620
1621 #if CK_NO_PROC_POOL
1622 #if NODE_CHECKPOINT
1623       int numnodes = CmiNumPhysicalNodes();
1624 #else
1625       int numnodes = CkNumPes();
1626 #endif
1627       if (numnodes-totalFailed() <=2) {
1628         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1629         _memChkptOn = 0;
1630       }
1631 #endif
1632     }
1633
1634     void CkMemCheckPT::recoverFromSoftFailure()
1635     {
1636       inRestarting = 0;
1637       maxIter = -1;
1638       CpvAccess(recvdRemote) = 0;
1639       CpvAccess(recvdLocal) = 0;
1640       CpvAccess(localChkpDone) = 0;
1641       CpvAccess(remoteChkpDone) = 0;
1642       CpvAccess(remoteReady) = 0;
1643       CpvAccess(localReady) = 0;
1644       inCheckpointing = 0;
1645       notifyReplica = 0;
1646       CpvAccess(remoteStarted) = 0;
1647       CpvAccess(localStarted) = 0;
1648       CpvAccess(_remoteCrashedNode) = -1;
1649       CkMemCheckPT::replicaAlive = 1;
1650       inCheckpointing = 0;
1651       notifyReplica = 0;
1652       if(CkMyPe() == 0){
1653         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1654       }
1655       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1656       if(CmiMyPartition()==1 && CkMyPe()==0){
1657         thisProxy[CkMyPe()].generateSoftFailure();
1658       }
1659     }
1660     // called only on 0
1661     void CkMemCheckPT::quiescence(CkCallback &cb)
1662     {
1663       static int pe_count = 0;
1664       pe_count ++;
1665       CmiAssert(CkMyPe() == 0);
1666       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1667       if (pe_count == CkNumPes()) {
1668         pe_count = 0;
1669         cb.send();
1670       }
1671     }
1672
1673     // User callable function - to start a checkpoint
1674     // callback cb is used to pass control back
1675     void CkStartMemCheckpoint(CkCallback &cb)
1676     {
1677 #if CMK_MEM_CHECKPOINT
1678       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1679       /*if (_memChkptOn == 0) {
1680         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1681         cb.send();
1682         return;
1683         }*/
1684       if (CkInRestarting()) {
1685         // trying to checkpointing during restart
1686         cb.send();
1687         return;
1688       }
1689       if(CkInCheckpointing())
1690         return;
1691       // store user callback and user data
1692       CkMemCheckPT::cpCallback = cb;
1693
1694
1695       //send to my replica that checkpoint begins 
1696       if(CkReplicaAlive()==1){
1697         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1698         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1699         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1700       }
1701       CpvAccess(localStarted) = 1;
1702       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1703       checkptMgr.chkpLocalStart();
1704       // broadcast to start check pointing
1705       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1706         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1707         checkptMgr.doItNow(CkMyPe());
1708       }
1709 #else
1710       // when mem checkpoint is disabled, invike cb immediately
1711       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1712       cb.send();
1713 #endif
1714     }
1715
1716     void CkRestartCheckPoint(int diePe)
1717     {
1718       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1719       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1720       // broadcast
1721       checkptMgr.restart(diePe);
1722     }
1723
1724     static int _diePE = -1;
1725
1726     // callback function used locally by ccs handler
1727     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1728     {
1729       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1730       CkRestartCheckPoint(_diePE);
1731     }
1732
1733
1734     // called on crashed PE
1735     static void restartBeginHandler(char *msg)
1736     {
1737       CmiFree(msg);
1738 #if CMK_MEM_CHECKPOINT
1739 #if CMK_USE_BARRIER
1740       if(CkMyPe()!=_diePE){
1741         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1742         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1743         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1744         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1745       }else{
1746         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1747         CkRestartCheckPointCallback(NULL, NULL);
1748       }
1749 #else
1750       static int count = 0;
1751       CmiAssert(CkMyPe() == _diePE);
1752       count ++;
1753       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1754         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1755         CkRestartCheckPointCallback(NULL, NULL);
1756         count = 0;
1757       }
1758 #endif
1759 #endif
1760     }
1761
1762     extern void _discard_charm_message();
1763     extern void _resume_charm_message();
1764
1765     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1766       return data;
1767     }
1768
1769     static void restartBcastHandler(char *msg)
1770     {
1771 #if CMK_MEM_CHECKPOINT
1772       // advance phase counter
1773       CkMemCheckPT::inRestarting = 1;
1774       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1775       // gzheng
1776       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1777
1778       if (CkMyPe()==_diePE)
1779         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1780
1781       // reset QD counters
1782       /*  gzheng
1783           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1784        */
1785
1786       /*  gzheng
1787           if (CkMyPe()==_diePE)
1788           CkRestartCheckPointCallback(NULL, NULL);
1789        */
1790       CmiFree(msg);
1791
1792       _resume_charm_message();
1793
1794       // reduction
1795       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1796       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1797 #if CMK_USE_BARRIER
1798       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1799 #else
1800       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1801 #endif 
1802       checkpointed = 0;
1803 #endif
1804     }
1805
1806     extern void _initDone();
1807
1808     bool compare(char * buf1, char *buf2){
1809       if(CkMyPe()==0)
1810         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1811       PUP::checker pchecker(buf1,buf2);
1812       pchecker.skip();
1813       int numElements;
1814       pchecker|numElements;
1815       for(int i=0;i<numElements;i++){
1816         CkGroupID gID;
1817         CkArrayIndex idx;
1818
1819         pchecker|gID;
1820         pchecker|idx;
1821
1822         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1823         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1824       }
1825       if(CkMyPe()==0)
1826         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1827       //int fault_num = pchecker.getFaultNum();
1828       bool result = pchecker.getResult();
1829       /*if(!result){
1830         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1831       }*/
1832       return result;
1833     }
1834     int getChecksum(char * buf){
1835       PUP::checker pchecker(buf);
1836       pchecker.skip();
1837       int numElements;
1838       pchecker|numElements;
1839 //      CkPrintf("num %d\n",numElements);
1840       for(int i=0;i<numElements;i++){
1841         CkGroupID gID;
1842         CkArrayIndex idx;
1843
1844         pchecker|gID;
1845         pchecker|idx;
1846
1847         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1848         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1849       }
1850       return pchecker.getChecksum();
1851     }
1852
1853     static void recvRemoteChkpHandler(char *msg){
1854       CpvAccess(remoteChkpDone) = 1;
1855       if(CpvAccess(use_checksum)){
1856         if(CkMyPe()==0)
1857           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1858         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1859         CpvAccess(recvdRemote) = 1;
1860         if(CpvAccess(recvdLocal)==1){
1861           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1862             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1863           }
1864           else{
1865             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1866             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1867           }
1868         }
1869       }else{
1870         envelope *env = (envelope *)msg;
1871         CkUnpackMessage(&env);
1872         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1873         if(CpvAccess(recvdLocal)==1){
1874           int pointer = CpvAccess(curPointer);
1875           int size = CpvAccess(chkpBuf)[pointer]->len;
1876           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1877             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1878           }else
1879           {
1880             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1881             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1882           }
1883           delete chkpMsg;
1884           if(CkMyPe()==0)
1885             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1886         }else{
1887           CpvAccess(recvdRemote) = 1;
1888           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1889           CpvAccess(buddyBuf) = chkpMsg;
1890         }
1891       }
1892     }
1893
1894     static void replicaRecoverHandler(char *msg){
1895       //fflush(stdout);
1896       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1897       CpvAccess(remoteChkpDone) = 1;
1898       if(CpvAccess(localChkpDone) == 1)
1899         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1900       else{  
1901         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1902         //CmiAbort("impossible state");
1903       }
1904       CmiFree(msg);
1905     }
1906
1907     
1908     static void replicaBeginFailureInjectionHandler(char * msg){
1909       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1910       checkptMgr.generateFailure();
1911       CmiFree(msg);
1912     }
1913     
1914     static void replicaChkpDoneHandler(char *msg){
1915       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1916       CpvAccess(remoteChkpDone) = 1;
1917       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1918       if(CpvAccess(localChkpDone) == 1)
1919         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1920       else  
1921         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1922       CmiFree(msg);
1923     }
1924
1925     static void replicaDieHandler(char * msg){
1926 #if CMK_CONVERSE_MPI
1927       //broadcast to every one in my replica
1928       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1929       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1930 #endif
1931     }
1932
1933     static void replicaChkpStartHandler(char * msg){
1934       if(CkMyPe()==0){
1935         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1936       }
1937       CpvAccess(remoteStarted) =1;
1938       if(CpvAccess(localStarted)==1){    
1939         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1940         checkptMgr.doItNow(0);
1941       }
1942     }
1943
1944     static void replicaDieBcastHandler(char *msg){
1945       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1946       CpvAccess(_remoteCrashedNode) = diePe;
1947       if(CkMyPe()==diePe){
1948         CmiPrintf("pe %d in replica world die\n",diePe);
1949         fflush(stdout);
1950       }
1951       
1952       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1953         find_spare_mpirank(diePe,CmiMyPartition()^1);
1954       }
1955
1956       //what if am already ready to checkpoint
1957       if(CpvAccess(resilience)!=1){
1958         CkMemCheckPT::replicaAlive = 0;
1959         if(CpvAccess(localStarted)==1){    
1960           if(CkMyPe()==0){
1961             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1962             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1963             checkptMgr.doItNow(0);
1964           }
1965         }
1966         //broadcast to my partition to get local max iter
1967         else{
1968           if(CkMyPe()==diePe){
1969             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
1970           }
1971           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1972         }
1973       }
1974
1975       CmiFree(msg);
1976     }
1977
1978     static void recoverRemoteProcDataHandler(char *msg){
1979       envelope *env = (envelope *)msg;
1980       CkUnpackMessage(&env);
1981       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1982
1983       //store the checkpoint
1984       int pointer = procMsg->pointer;
1985
1986
1987       if(CkMyPe()==CpvAccess(_crashedNode)){
1988         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1989         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1990         PUP::fromMem p(procMsg->packData);
1991         _handleProcData(p,CmiTrue);
1992         _initDone();
1993         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
1994       }
1995       else{
1996         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1997         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1998         //_handleProcData(p,CmiFalse);
1999       }
2000       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2001       CKLOCMGR_LOOP(mgr->startInserting(););
2002
2003       CpvAccess(recvdProcChkp) =1;
2004       if(CpvAccess(recvdArrayChkp)==1){
2005         _resume_charm_message();
2006         _diePE = CpvAccess(_crashedNode);
2007         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2008         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2009         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2010 #if CMK_USE_BARRIER
2011         if(CpvAccess(resilience)==1){
2012           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2013         }else
2014           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2015 #else
2016         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2017 #endif 
2018       }
2019     }
2020
2021     static void recoverRemoteArrayDataHandler(char *msg){
2022       envelope *env = (envelope *)msg;
2023       CkUnpackMessage(&env);
2024       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
2025
2026       //store the checkpoint
2027       int pointer = chkpMsg->pointer;
2028       CpvAccess(curPointer) = pointer;
2029       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
2030       CpvAccess(chkpBuf)[pointer] = chkpMsg;
2031       CpvAccess(recvdArrayChkp) =1;
2032       CkMemCheckPT::inRestarting = 1;
2033
2034       if(CkMyPe()==CpvAccess(_crashedNode))
2035         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
2036       
2037       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
2038         _resume_charm_message();
2039         _diePE = CpvAccess(_crashedNode);
2040         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
2041         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2042         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2043         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2044 #if CMK_USE_BARRIER
2045         if(CpvAccess(resilience)==1){
2046           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2047         }else
2048           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2049 #else
2050         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2051 #endif 
2052       }
2053     }
2054
2055     static void recvPhaseHandler(char * msg)
2056     {
2057       //decrease the phase number so that the crashed replica can communicate with the healthy one
2058       CpvAccess(_curRestartPhase)--;
2059       
2060       CkMemCheckPT::inRestarting = 1;
2061       CmiFree(msg);
2062       //notify the buddy in the replica now i can receive the checkpoint msg
2063       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
2064         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2065         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
2066         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
2067         //timer start
2068         if(CpvAccess(_crashedNode)==CkMyPe()){
2069           CkMemCheckPT::startTime = restartT = CmiWallTimer();
2070           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
2071         }
2072       }else{
2073         CpvAccess(curPointer)^=1;
2074         CkMemCheckPT::inRestarting = 1;
2075         _resume_charm_message();
2076         _diePE = CpvAccess(_crashedNode);
2077       }
2078     }
2079     
2080     static void askRecoverDataHandler(char * msg){
2081       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2082         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
2083       if(CpvAccess(resilience)!=1){
2084         CpvAccess(remoteReady)=1;
2085         if(CpvAccess(localReady)==1){
2086           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2087           {     
2088             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
2089             CkPackMessage(&env);
2090             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2091             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2092             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2093           }
2094           //send the array checkpoint data
2095           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2096           CkPackMessage(&env);
2097           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2098           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2099           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2100             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2101         }
2102       }else{
2103         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2104         int pointer = CpvAccess(curPointer)^1;
2105         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2106         procMsg->pointer = pointer;
2107         envelope * env = (envelope *)(UsrToEnv(procMsg));
2108         CkPackMessage(&env);
2109         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2110         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2111         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2112         
2113         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2114         arrayMsg->pointer = pointer;
2115         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2116         CkPackMessage(&env1);
2117         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2118         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2119         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2120       }
2121     }
2122     // called on crashed processor
2123     static void recoverProcDataHandler(char *msg)
2124     {
2125 #if CMK_MEM_CHECKPOINT
2126       int i;
2127       envelope *env = (envelope *)msg;
2128       CkUnpackMessage(&env);
2129       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2130       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2131       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2132       PUP::fromMem p(procMsg->packData);
2133       _handleProcData(p,CmiTrue);
2134
2135       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2136       // gzheng
2137       CKLOCMGR_LOOP(mgr->startInserting(););
2138
2139       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2140       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2141       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2142       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2143
2144       _initDone();
2145       //   CpvAccess(_qd)->flushStates();
2146       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2147 #endif
2148     }
2149     //for replica, got the phase number from my neighbor processor in the current partition
2150     static void askPhaseHandler(char *msg)
2151     {
2152 #if CMK_MEM_CHECKPOINT
2153       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2154       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2155       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2156       CmiSetHandler(msg, recvPhaseHandlerIdx);
2157       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2158 #endif
2159     }
2160     // called on its backup processor
2161     // get backup message buffer and sent to crashed processor
2162     static void askProcDataHandler(char *msg)
2163     {
2164 #if CMK_MEM_CHECKPOINT
2165       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2166       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2167       if (CpvAccess(procChkptBuf) == NULL)  {
2168         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2169         CkAbort("no checkpoint found");
2170       }
2171       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2172       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2173
2174       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2175
2176       CkPackMessage(&env);
2177       CmiSetHandler(env, recoverProcDataHandlerIdx);
2178       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2179       CpvAccess(procChkptBuf) = NULL;
2180       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2181 #endif
2182     }
2183
2184     // called on PE 0
2185     void qd_callback(void *m)
2186     {
2187       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2188       fflush(stdout);
2189       CkFreeMsg(m);
2190       if(CmiNumPartition()==1){
2191 #ifdef CMK_SMP
2192         for(int i=0;i<CmiMyNodeSize();i++){
2193           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2194           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2195           CmiSetHandler(msg, askProcDataHandlerIdx);
2196           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2197           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2198         }
2199         return;
2200 #endif
2201         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2202         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2203         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2204         CmiSetHandler(msg, askProcDataHandlerIdx);
2205         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2206         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2207       }
2208       else{
2209         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2210         CmiSetHandler(msg, recvPhaseHandlerIdx);
2211         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2212       }
2213     }
2214
2215     // on crashed node
2216     void CkMemRestart(const char *dummy, CkArgMsg *args)
2217     {
2218 #if CMK_MEM_CHECKPOINT
2219       _diePE = CmiMyNode();
2220       CpvAccess( _crashedNode )= CmiMyNode();
2221       CkMemCheckPT::inRestarting = 1;
2222       _discard_charm_message();
2223
2224       /*if(CmiMyRank()==0){
2225         CkCallback cb(qd_callback);
2226         CkStartQD(cb);
2227         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2228         }*/
2229       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2230       if(CmiNumPartition()==1){
2231         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2232         restartT = CmiWallTimer();
2233         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2234         fflush(stdout);
2235         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2236         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2237         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2238         CmiSetHandler(msg, askProcDataHandlerIdx);
2239         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2240         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2241       }
2242       else{
2243         CkCallback cb(qd_callback);
2244         CkStartQD(cb);
2245       }
2246 #else
2247       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2248 #endif
2249     }
2250
2251     // can be called in other files
2252     // return true if it is in restarting
2253     extern "C"
2254       int CkInRestarting()
2255       {
2256 #if CMK_MEM_CHECKPOINT
2257         if (CpvAccess( _crashedNode)!=-1) return 1;
2258         // gzheng
2259         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2260         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2261         return CkMemCheckPT::inRestarting;
2262 #else
2263         return 0;
2264 #endif
2265       }
2266
2267     extern "C"
2268       int CkReplicaAlive()
2269       {
2270         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2271         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2272         return CkMemCheckPT::replicaAlive;
2273
2274         /*if(CkMemCheckPT::replicaDead==1)
2275           return 0;
2276           else          
2277           return 1;*/
2278       }
2279
2280     extern "C"
2281       int CkInCheckpointing()
2282       {
2283         return CkMemCheckPT::inCheckpointing;
2284       }
2285
2286     extern "C"
2287       void CkSetInLdb(){
2288 #if CMK_MEM_CHECKPOINT
2289         CkMemCheckPT::inLoadbalancing = 1;
2290 #endif
2291       }
2292
2293     extern "C"
2294       int CkInLdb(){
2295 #if CMK_MEM_CHECKPOINT
2296         return CkMemCheckPT::inLoadbalancing;
2297 #endif
2298         return 0;
2299       }
2300
2301     extern "C"
2302       void CkResetInLdb(){
2303 #if CMK_MEM_CHECKPOINT
2304         CkMemCheckPT::inLoadbalancing = 0;
2305 #endif
2306       }
2307
2308     /*****************************************************************************
2309       module initialization
2310      *****************************************************************************/
2311
2312     static int arg_where = CkCheckPoint_inMEM;
2313
2314 #if CMK_MEM_CHECKPOINT
2315     void init_memcheckpt(char **argv)
2316     {
2317       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2318         arg_where = CkCheckPoint_inDISK;
2319       }
2320       CpvInitialize(int, use_checksum);
2321       CpvInitialize(int, resilience);
2322       CpvAccess(use_checksum)=0;
2323       CpvAccess(resilience)=0;//TODO should set a default resilience level
2324       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2325         CpvAccess(use_checksum)=1;
2326       }
2327       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2328         CpvAccess(resilience)=1;
2329       }
2330       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2331         CpvAccess(resilience)=2;
2332       }
2333       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2334         CpvAccess(resilience)=3;
2335       }
2336       // initiliazing _crashedNode variable
2337       CpvInitialize(int, _crashedNode);
2338       CpvInitialize(int, _remoteCrashedNode);
2339       CpvAccess(_crashedNode) = -1;
2340       CpvAccess(_remoteCrashedNode) = -1;
2341       init_FI(argv);
2342     }
2343 #endif
2344
2345     class CkMemCheckPTInit: public Chare {
2346       public:
2347         CkMemCheckPTInit(CkArgMsg *m) {
2348 #if CMK_MEM_CHECKPOINT
2349           if (arg_where == CkCheckPoint_inDISK) {
2350             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2351           }
2352
2353           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2354           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2355 #endif
2356         }
2357     };
2358
2359     static void notifyHandler(char *msg)
2360     {
2361 #if CMK_MEM_CHECKPOINT
2362       CmiFree(msg);
2363       /* immediately increase restart phase to filter old messages */
2364       CpvAccess(_curRestartPhase) ++;
2365       CpvAccess(_qd)->flushStates();
2366       _discard_charm_message();
2367
2368 #endif
2369     }
2370
2371     extern "C"
2372       void notify_crash(int node)
2373       {
2374 #ifdef CMK_MEM_CHECKPOINT
2375         CpvAccess( _crashedNode) = node;
2376 #ifdef CMK_SMP
2377         for(int i=0;i<CkMyNodeSize();i++){
2378           CpvAccessOther(_crashedNode,i)=node;
2379         }
2380 #endif
2381         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2382         CkMemCheckPT::inRestarting = 1;
2383
2384         // this may be in interrupt handler, send a message to reset QD
2385         int pe = CmiNodeFirst(CkMyNode());
2386         for(int i=0;i<CkMyNodeSize();i++){
2387           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2388           CmiSetHandler(msg, notifyHandlerIdx);
2389           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2390         }
2391 #endif
2392       }
2393
2394     extern "C" void (*notify_crash_fn)(int node);
2395
2396
2397 #if CMK_CONVERSE_MPI
2398     void buddyDieHandler(char *msg)
2399     {
2400 #if CMK_MEM_CHECKPOINT
2401       // notify
2402       CkMemCheckPT::inRestarting = 1;
2403       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2404       notify_crash(diepe);
2405       // send message to crash pe to let it restart
2406       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2407       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2408       int buddy = obj->BuddyPE(CmiMyPe());
2409       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2410       if (buddy == diepe)  {
2411         mpi_restart_crashed(diepe, newrank);
2412       }
2413 #endif
2414     }
2415
2416     void pingHandler(void *msg)
2417     {
2418       lastPingTime = CmiWallTimer();
2419       CmiFree(msg);
2420     }
2421
2422     void pingCheckHandler()
2423     {
2424 #if CMK_MEM_CHECKPOINT
2425       double now = CmiWallTimer();
2426       if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2427         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2428         int i, pe, buddy;
2429         // tell everyone the buddy dies
2430         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2431         for (i = 1; i < CmiNumPes(); i++) {
2432           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2433           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2434         }
2435         buddy = pe;
2436         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2437         fflush(stdout);
2438         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2439         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2440         CmiSetHandler(msg, buddyDieHandlerIdx);
2441         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2442         //send to everyone in the other world
2443         if(CmiNumPartition()!=1){
2444           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2445           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2446           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2447           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2448         }
2449       }
2450         else 
2451           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2452 #endif
2453       }
2454
2455       void pingBuddy()
2456       {
2457 #if CMK_MEM_CHECKPOINT
2458         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2459         if (obj) {
2460           int buddy = obj->BuddyPE(CkMyPe());
2461           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2462           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2463           CmiSetHandler(msg, pingHandlerIdx);
2464           CmiGetRestartPhase(msg) = 9999;
2465           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2466         }
2467         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2468 #endif
2469       }
2470 #endif
2471
2472       // initproc
2473       void CkRegisterRestartHandler( )
2474       {
2475 #if CMK_MEM_CHECKPOINT
2476         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2477         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2478         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2479         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2480         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2481         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2482
2483         //for replica
2484         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2485         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2486         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2487         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2488         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2489         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2490         replicaBeginFailureInjectionHandlerIdx = CkRegisterHandler((CmiHandler)replicaBeginFailureInjectionHandler);
2491         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2492         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2493         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2494         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2495
2496 #if CMK_CONVERSE_MPI
2497         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2498         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2499         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2500 #endif
2501
2502         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2503         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2504         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2505         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2506         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2507         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2508         CpvInitialize(int,curPointer);
2509         CpvInitialize(int,recvdLocal);
2510         CpvInitialize(int,localChkpDone);
2511         CpvInitialize(int,remoteChkpDone);
2512         CpvInitialize(int,remoteStarted);
2513         CpvInitialize(int,localStarted);
2514         CpvInitialize(int,localReady);
2515         CpvInitialize(int,remoteReady);
2516         CpvInitialize(int,recvdRemote);
2517         CpvInitialize(int,recvdProcChkp);
2518         CpvInitialize(int,localChecksum);
2519         CpvInitialize(int,remoteChecksum);
2520         CpvInitialize(int,recvdArrayChkp);
2521
2522         CpvAccess(procChkptBuf) = NULL;
2523         CpvAccess(buddyBuf) = NULL;
2524         CpvAccess(recoverProcBuf) = NULL;
2525         CpvAccess(recoverArrayBuf) = NULL;
2526         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2527         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2528         CpvAccess(chkpBuf)[0] = NULL;
2529         CpvAccess(chkpBuf)[1] = NULL;
2530         CpvAccess(localProcChkpBuf)[0] = NULL;
2531         CpvAccess(localProcChkpBuf)[1] = NULL;
2532
2533         CpvAccess(curPointer) = 0;
2534         CpvAccess(recvdLocal) = 0;
2535         CpvAccess(localChkpDone) = 0;
2536         CpvAccess(remoteChkpDone) = 0;
2537         CpvAccess(remoteStarted) = 0;
2538         CpvAccess(localStarted) = 0;
2539         CpvAccess(localReady) = 0;
2540         CpvAccess(remoteReady) = 0;
2541         CpvAccess(recvdRemote) = 0;
2542         CpvAccess(recvdProcChkp) = 0;
2543         CpvAccess(localChecksum) = 0;
2544         CpvAccess(remoteChecksum) = 0;
2545         CpvAccess(recvdArrayChkp) = 0;
2546
2547         notify_crash_fn = notify_crash;
2548
2549 #if ! CMK_CONVERSE_MPI
2550         // print pid to kill
2551         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2552         //  sleep(4);
2553 #endif
2554 #endif
2555       }
2556
2557
2558       extern "C"
2559         int CkHasCheckpoints()
2560         {
2561           return checkpointed;
2562         }
2563
2564       /// @todo: the following definitions should be moved to a separate file containing
2565       // structures and functions about fault tolerance strategies
2566
2567       /**
2568        *  * @brief: function for killing a process                                             
2569        *   */
2570 #ifdef CMK_MEM_CHECKPOINT
2571 #if CMK_HAS_GETPID
2572       void killLocal(void *_dummy,double curWallTime){
2573         if(CmiWallTimer()<killTime-1){
2574           printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2575           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2576         }else{ 
2577 #if CMK_CONVERSE_MPI
2578           if(!CkInCheckpointing()&&!CkInRestarting()){
2579             printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2580             CkDieNow();
2581           }else{
2582             //next failure
2583             if(killFlag == 2){
2584               //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
2585               //checkptMgr.generateFailure();
2586               //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->replicaInjectFailure();
2587               //delay for 2s 
2588               CcdCallFnAfter(killLocal,NULL,2*1000);        
2589             }
2590           }
2591 #else 
2592           kill(getpid(),SIGKILL);                                               
2593 #endif
2594         }              
2595       } 
2596 #else
2597       void killLocal(void *_dummy,double curWallTime){
2598         CmiAbort("kill() not supported!");
2599       }
2600 #endif
2601       void injectSoftFailure(void *_dummy,double curWallTime){
2602         if(!CkInCheckpointing()&&!CkInRestarting()){
2603           CkPrintf("soft failure injected\n");
2604           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->softFailureInjected = true;
2605         }else{
2606           //next failure
2607           CcdCallFnAfter(killLocal,NULL,2*1000);
2608         }
2609       }
2610 #endif
2611
2612 #ifdef CMK_MEM_CHECKPOINT
2613       /**
2614        * @brief: reads the file with the kill information
2615        */
2616       void readKillFile(){
2617         FILE *fp=fopen(killFile,"r");
2618         if(!fp){
2619           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2620           return;
2621         }
2622         int proc;
2623         double sec;
2624         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2625           if(proc == CkMyNode() && CkMyRank() == 0){
2626             killTime = CmiWallTimer()+sec;
2627             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2628             CcdCallFnAfter(killLocal,NULL,sec*1000);
2629           }
2630         }
2631         fclose(fp);
2632       }
2633
2634
2635 #if ! CMK_CONVERSE_MPI
2636       void CkDieNow()
2637       {
2638 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2639         // ignored for non-mpi version
2640         CmiPrintf("[%d] die now.\n", CmiMyPe());
2641         killTime = CmiWallTimer()+0.001;
2642         CcdCallFnAfter(killLocal,NULL,1);
2643 #endif
2644       }
2645 #endif
2646
2647 #endif
2648
2649 #include "CkMemCheckpoint.def.h"
2650
2651