resolve some memory leak issue, inject failure to certain pe for better control
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92 unsigned int failureSeed;
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // flag for failure distributiona
104 char * failureDist;
105 // Meantimr between failure
106 int MTBF;
107 int SMTBF;
108 //shape parameter for Weibull distribution
109 double beta;
110 double alpha;
111 double s_alpha;
112 // variable for storing the killing time
113 double killTime=0.0;
114 extern void killLocal(void *_dummy,double curWallTime);
115 extern void injectSoftFailure(void *_dummy,double curWallTime);
116 #endif
117
118 #ifdef CKLOCMGR_LOOP
119 #undef CKLOCMGR_LOOP
120 #endif
121 // loop over all CkLocMgr and do "code"
122 #define  CKLOCMGR_LOOP(code)    {       \
123   int numGroups = CkpvAccess(_groupIDTable)->size();    \
124   for(int i=0;i<numGroups;i++) {        \
125     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
126     if(obj->isLocMgr())  {      \
127       CkLocMgr *mgr = (CkLocMgr*)obj;   \
128       code      \
129     }   \
130   }     \
131 }
132
133 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
134 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
135 CpvDeclare(CkCheckPTMessage**, chkpBuf);
136 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
137 //store the checkpoint of the buddy to compare
138 //do not need the whole msg, can be the checksum
139 CpvDeclare(CkCheckPTMessage*, buddyBuf);
140 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
141 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
142 //pointer of the checkpoint going to be written
143 CpvDeclare(int, curPointer);
144 CpvDeclare(int, recvdRemote);
145 CpvDeclare(int, recvdLocal);
146 CpvDeclare(int, localChkpDone);
147 CpvDeclare(int, remoteChkpDone);
148 CpvDeclare(int, remoteStarted);
149 CpvDeclare(int, localStarted);
150 CpvDeclare(int, remoteReady);
151 CpvDeclare(int, localReady);
152 CpvDeclare(int, recvdArrayChkp);
153 CpvDeclare(int, recvdProcChkp);
154 CpvDeclare(int, localChecksum);
155 CpvDeclare(int, remoteChecksum);
156
157 bool compare(char * buf1, char * buf2);
158 int getChecksum(char * buf);
159 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
160 // Converse function handles
161 static int askPhaseHandlerIdx;
162 static int recvPhaseHandlerIdx;
163 static int askProcDataHandlerIdx;
164 static int askRecoverDataHandlerIdx;
165 static int restartBcastHandlerIdx;
166 static int recoverProcDataHandlerIdx;
167 static int restartBeginHandlerIdx;
168 static int recvRemoteChkpHandlerIdx;
169 static int replicaDieHandlerIdx;
170 static int replicaChkpStartHandlerIdx;
171 static int replicaDieBcastHandlerIdx;
172 static int replicaRecoverHandlerIdx;
173 static int replicaChkpDoneHandlerIdx;
174 static int replicaBeginFailureInjectionHandlerIdx;
175 static int recoverRemoteProcDataHandlerIdx;
176 static int recoverRemoteArrayDataHandlerIdx;
177 static int notifyHandlerIdx;
178 // compute the backup processor
179 // FIXME: avoid crashed processors
180 #if CMK_CONVERSE_MPI
181 static int pingHandlerIdx;
182 static int pingCheckHandlerIdx;
183 static int buddyDieHandlerIdx;
184 static double lastPingTime = -1;
185
186 extern "C" void mpi_restart_crashed(int pe, int rank);
187 extern "C" int  find_spare_mpirank(int pe,int partition);
188
189 void pingBuddy();
190 void pingCheckHandler();
191
192 #endif
193 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
194
195 inline int CkMemCheckPT::BuddyPE(int pe)
196 {
197   int budpe;
198 #if NODE_CHECKPOINT
199   // buddy is the processor with same rank on the next physical node
200   int r1 = CmiPhysicalRank(pe);
201   int budnode = CmiPhysicalNodeID(pe);
202   do {
203     budnode = (budnode+1)%CmiNumPhysicalNodes();
204     int *pelist;
205     int num;
206     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
207     budpe = pelist[r1 % num];
208   } while (isFailed(budpe));
209   if (budpe == pe) {
210     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
211     CmiAbort("Failed to find a buddy processor");
212   }
213 #else
214   budpe = pe;
215   budpe = (budpe+1)%CkNumPes();
216 #endif
217   return budpe;
218 }
219
220 // called in array element constructor
221 // choose and register with 2 buddies for checkpoiting 
222 #if CMK_MEM_CHECKPOINT
223 void ArrayElement::init_checkpt() {
224   if (_memChkptOn == 0) return;
225   if (CkInRestarting()) {
226     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
227   }
228   // only master init checkpoint
229   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
230
231   budPEs[0] = CkMyPe();
232   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
233   CmiAssert(budPEs[0] != budPEs[1]);
234   // inform checkPTMgr
235   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
236   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
237   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
238 }
239 #endif
240
241 // entry function invoked by checkpoint mgr asking for checkpoint data
242 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
243 #if CMK_MEM_CHECKPOINT
244   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
245   //char index[128];   thisIndexMax.sprint(index);
246   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
247   CkLocMgr *locMgr = thisArray->getLocMgr();
248   CmiAssert(myRec!=NULL);
249   int size;
250   {
251     PUP::sizer p;
252     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
253     size = p.size();
254   }
255   int packSize = size/sizeof(double) +1;
256   CkArrayCheckPTMessage *msg =
257     new (packSize, 0) CkArrayCheckPTMessage;
258   msg->len = size;
259   msg->index =thisIndexMax;
260   msg->aid = thisArrayID;
261   msg->locMgr = locMgr->getGroupID();
262   msg->cp_flag = 1;
263   {
264     PUP::toMem p(msg->packData);
265     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
266   }
267
268   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
269   checkptMgr.recvData(msg, 2, budPEs);
270   delete m;
271 #endif
272 }
273
274 // checkpoint holder class - for memory checkpointing
275 class CkMemCheckPTInfo: public CkCheckPTInfo
276 {
277   CkArrayCheckPTMessage *ckBuffer;
278   public:
279   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
280     CkCheckPTInfo(a, loc, idx, pno)
281   {
282     ckBuffer = NULL;
283   }
284   ~CkMemCheckPTInfo() 
285   {
286     if (ckBuffer) delete ckBuffer; 
287   }
288   inline void updateBuffer(CkArrayCheckPTMessage *data) 
289   {
290     CmiAssert(data!=NULL);
291     if (ckBuffer) delete ckBuffer;
292     ckBuffer = data;
293   }    
294   inline CkArrayCheckPTMessage * getCopy()
295   {
296     if (ckBuffer == NULL) {
297       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
298       CmiAbort("Abort!");
299     }
300     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
301   }     
302   inline void updateBuddy(int b1, int b2) {
303     CmiAssert(ckBuffer);
304     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
305     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
306     CmiAssert(pNo != CkMyPe());
307   }
308   inline int getSize() { 
309     CmiAssert(ckBuffer);
310     return ckBuffer->len; 
311   }
312 };
313
314 // checkpoint holder class - for in-disk checkpointing
315 class CkDiskCheckPTInfo: public CkCheckPTInfo 
316 {
317   char *fname;
318   int bud1, bud2;
319   int len;                      // checkpoint size
320   public:
321   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
322   {
323 #if CMK_USE_MKSTEMP
324     fname = new char[64];
325     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
326     mkstemp(fname);
327 #else
328     fname=tmpnam(NULL);
329 #endif
330     bud1 = bud2 = -1;
331     len = 0;
332   }
333   ~CkDiskCheckPTInfo() 
334   {
335     remove(fname);
336   }
337   inline void updateBuffer(CkArrayCheckPTMessage *data) 
338   {
339     double t = CmiWallTimer();
340     // unpack it
341     envelope *env = UsrToEnv(data);
342     CkUnpackMessage(&env);
343     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
344     FILE *f = fopen(fname,"wb");
345     PUP::toDisk p(f);
346     CkPupMessage(p, (void **)&data);
347     // delay sync to the end because otherwise the messages are blocked
348     //    fsync(fileno(f));
349     fclose(f);
350     bud1 = data->bud1;
351     bud2 = data->bud2;
352     len = data->len;
353     delete data;
354     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
355   }
356   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
357   {
358     CkArrayCheckPTMessage *data;
359     FILE *f = fopen(fname,"rb");
360     PUP::fromDisk p(f);
361     CkPupMessage(p, (void **)&data);
362     fclose(f);
363     data->bud1 = bud1;                          // update the buddies
364     data->bud2 = bud2;
365     return data;
366   }
367   inline void updateBuddy(int b1, int b2) {
368     bud1 = b1; bud2 = b2;
369     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
370     CmiAssert(pNo != CkMyPe());
371   }
372   inline int getSize() { 
373     return len; 
374   }
375 };
376
377 CkMemCheckPT::CkMemCheckPT(int w)
378 {
379   int numnodes = 0;
380 #if NODE_CHECKPOINT
381   numnodes = CmiNumPhysicalNodes();
382 #else
383   numnodes = CkNumPes();
384 #endif
385 #if CK_NO_PROC_POOL
386   if (numnodes <= 2)
387 #else
388     if (numnodes  == 1)
389 #endif
390     {
391       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
392       _memChkptOn = 0;
393     }
394   inRestarting = 0;
395   inCheckpointing = 0;
396   recvCount = peCount = 0;
397   ackCount = 0;
398   expectCount = -1;
399   where = w;
400   replicaAlive = 1;
401   notifyReplica = 0;
402 #if CMK_CONVERSE_MPI
403   void pingBuddy();
404   void pingCheckHandler();
405   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
406   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
407 #endif
408   chkpTable[0] = NULL;
409   chkpTable[1] = NULL;
410   maxIter = -1;
411   recvIterCount = 0;
412   localDecided = false;
413   softFailureInjected = false;
414   if(killFlag == 2){
415     localSeed = failureSeed;
416     softLocalSeed = failureSeed*2;
417     //only inject failure to pe [0][1]
418     if(CkMyPe()==0 && CmiMyPartition()==0)
419       thisProxy[CkMyPe()].generateFailure();
420     
421     if(SMTBF!=-1 && CmiMyPartition()==1 && CkMyPe()==0){
422       thisProxy[CkMyPe()].generateSoftFailure();
423     }
424
425   }
426 }
427
428 void CkMemCheckPT::replicaInjectFailure(){
429   char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
430   CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
431   CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
432 }
433
434 void CkMemCheckPT::generateFailure(){
435   int rand3 = rand_r(&localSeed);
436   double sec;
437   if(strcmp(failureDist,"E")==0)
438     sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
439   else if(strcmp(failureDist,"W")==0)
440     sec = alpha*pow(-log(1.0f - ((double)rand3)/(long long int)(RAND_MAX)),1/beta);
441   thisProxy[1].killAfter(sec);
442  
443 }
444
445 void CkMemCheckPT::killAfter(double sec){
446   killTime = CmiWallTimer()+sec;
447   printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
448   CcdCallFnAfter(killLocal,NULL,sec*1000);
449 }
450
451 void CkMemCheckPT::generateSoftFailure(){
452   int rand = rand_r(&softLocalSeed);
453   double sec;
454   if(strcmp(failureDist,"E")==0){
455     sec = -log(1.0f - ((double)rand)/(long long int)(RAND_MAX))*SMTBF;
456   }
457   else{
458     sec = s_alpha*pow(-log(1.0f - ((double)rand)/(long long int)(RAND_MAX)),1/beta);
459   }
460   printf("[%d][%d] inject soft failure after %.6lf s (MEMCKPT)\n",CmiMyPartition(),CkMyPe(),sec);
461   CcdCallFnAfter(injectSoftFailure,NULL,sec*1000);
462 }
463
464 CkMemCheckPT::~CkMemCheckPT()
465 {
466   int len = ckTable.length();
467   for (int i=0; i<len; i++) {
468     delete ckTable[i];
469   }
470 }
471
472 void CkMemCheckPT::pup(PUP::er& p) 
473
474   CBase_CkMemCheckPT::pup(p); 
475   p|cpStarter;
476   p|thisFailedPe;
477   p|failedPes;
478   p|ckCheckPTGroupID;           // recover global variable
479   p|cpCallback;                 // store callback
480   p|where;                      // where to checkpoint
481   p|peCount;
482   p|localSeed;
483   p|softLocalSeed;
484   if (p.isUnpacking()) {
485     recvCount = 0;
486 #if CMK_CONVERSE_MPI
487     void pingBuddy();
488     void pingCheckHandler();
489     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
490     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
491 #endif
492     maxIter = -1;
493     recvIterCount = 0;
494     localDecided = false;
495   }
496 }
497
498 void CkMemCheckPT::getIter(){
499   localDecided = true;
500   localMaxIter = maxIter+1;
501   if(CkMyPe()==0){
502     CkPrintf("local max iter is %d\n",localMaxIter);
503   }
504   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
505   int elemCount = CkCountChkpSyncElements();
506   if(CkMyPe()==0)
507     startTime = CmiWallTimer();
508   if(elemCount == 0){
509     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
510   }
511 }
512
513 //when one replica failes, decide the next chkp iter
514
515 void CkMemCheckPT::recvIter(int iter){
516   if(maxIter<iter){
517     maxIter=iter;
518   }
519 }
520
521 void CkMemCheckPT::recvMaxIter(int iter){
522   localDecided = false;
523   if(CkMyPe()==0)
524     CkPrintf("checkpoint iteration is %d\n",iter);
525   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
526 }
527
528 void CkMemCheckPT::reachChkpIter(){
529   recvIterCount++;
530   elemCount = CkCountChkpSyncElements();
531   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
532   if(recvIterCount == elemCount){
533     recvIterCount = 0;
534     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
535   }
536 }
537
538 void CkMemCheckPT::startChkp(){
539   if(CkInCheckpointing()){
540     return;
541   }
542   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
543   CkStartMemCheckpoint(cpCallback);
544 }
545
546 // called by checkpoint mgr to restore an array element
547 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
548 {
549 #if CMK_MEM_CHECKPOINT
550   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
551   // m->index.print();
552   PUP::fromMem p(m->packData);
553   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
554   CmiAssert(mgr);
555 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
556   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
557 #else
558   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
559 #endif
560
561   // find a list of array elements bound together
562   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
563   CmiAssert(elt);
564   CkLocRec_local *rec = elt->myRec;
565   CkVec<CkMigratable *> list;
566   mgr->migratableList(rec, list);
567   CmiAssert(list.length() > 0);
568   for (int l=0; l<list.length(); l++) {
569     elt = (ArrayElement *)list[l];
570     elt->budPEs[0] = m->bud1;
571     elt->budPEs[1] = m->bud2;
572     //    reset, may not needed now
573     // for now.
574     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
575       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
576       if (c) c->redNo = 0;
577     }
578   }
579 #endif
580 }
581
582 // return 1 if pe is a crashed processor
583 int CkMemCheckPT::isFailed(int pe)
584 {
585   for (int i=0; i<failedPes.length(); i++)
586     if (failedPes[i] == pe) return 1;
587   return 0;
588 }
589
590 // add pe into history list of all failed processors
591 void CkMemCheckPT::failed(int pe)
592 {
593   if (isFailed(pe)) return;
594   failedPes.push_back(pe);
595 }
596
597 int CkMemCheckPT::totalFailed()
598 {
599   return failedPes.length();
600 }
601
602 // create an checkpoint entry for array element of aid with index.
603 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
604 {
605   // error check, no duplicate
606   int idx, len = ckTable.size();
607   for (idx=0; idx<len; idx++) {
608     CkCheckPTInfo *entry = ckTable[idx];
609     if (index == entry->index) {
610       if (loc == entry->locMgr) {
611         // bindTo array elements
612         return;
613       }
614       // for array inheritance, the following check may fail
615       // because ArrayElement constructor of all superclasses are called
616       if (aid == entry->aid) {
617         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
618         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
619       }
620     }
621   }
622   CkCheckPTInfo *newEntry;
623   if (where == CkCheckPoint_inMEM)
624     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
625   else
626     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
627   ckTable.push_back(newEntry);
628   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
629 }
630
631 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
632 {
633 #if !CMK_CHKP_ALL       
634   int buddy = msg->bud1;
635   if (buddy == CkMyPe()) buddy = msg->bud2;
636   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
637   recvData(msg);
638   // ack
639   thisProxy[buddy].gotData();
640 #else
641   chkpTable[0] = NULL;
642   chkpTable[1] = NULL;
643   recvArrayCheckpoint(msg);
644   thisProxy[msg->bud2].gotData();
645 #endif
646 }
647
648 void CkMemCheckPT::chkpLocalStart()
649 {
650   CpvAccess(localStarted) = 1;
651 }
652
653 // loop through my checkpoint table and ask checkpointed array elements
654 // to send me checkpoint data.
655 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
656 void CkMemCheckPT::doItNow(int starter)
657 {
658   checkpointed = 1;
659   //cpCallback = cb;
660   cpStarter = starter;
661   inCheckpointing = 1;
662   if (CkMyPe() == cpStarter) {
663     startTime = CmiWallTimer();
664     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
665   }
666 #if CMK_CONVERSE_MPI
667   if(CmiNumPartition()==1)
668 #endif    
669   {
670 #if !CMK_CHKP_ALL
671     int len = ckTable.length();
672     for (int i=0; i<len; i++) {
673       CkCheckPTInfo *entry = ckTable[i];
674       // always let the bigger number processor send request
675       //if (CkMyPe() < entry->pNo) continue;
676       // always let the smaller number processor send request, may on same proc
677       if (!isMaster(entry->pNo)) continue;
678       // call inmem_checkpoint to the array element, ask it to send
679       // back checkpoint data via recvData().
680       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
681       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
682     }
683     // if my table is empty, then I am done
684     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
685 #else
686     startArrayCheckpoint();
687 #endif
688     // pack and send proc level data
689     sendProcData();
690   }
691 #if CMK_CONVERSE_MPI
692   else
693   {
694     startCheckpoint();
695   }
696 #endif
697 }
698
699 class MemElementPacker : public CkLocIterator{
700   private:
701     //    CkLocMgr *locMgr;
702     PUP::er &p;
703     std::map<CkHashCode,CkLocation> arrayMap;
704   public:
705     MemElementPacker(PUP::er &p_):p(p_){};
706     void addLocation(CkLocation &loc){
707       CkArrayIndexMax idx = loc.getIndex();
708       arrayMap[idx.hash()] = loc;
709     }
710     void writeCheckpoint(){
711       std::map<CkHashCode, CkLocation>::iterator it;
712       for(it = arrayMap.begin();it!=arrayMap.end();it++){
713         CkLocation loc = it->second;
714         CkLocMgr *locMgr = loc.getManager();
715         CkArrayIndexMax idx = loc.getIndex();
716         CkGroupID gID = locMgr->ckGetGroupID();
717         p|gID;
718         p|idx;
719         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
720       }
721     }
722 };
723
724 void pupAllElements(PUP::er &p){
725 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
726   int numElements;
727   if(!p.isUnpacking()){
728     numElements = CkCountArrayElements();
729   }
730   p | numElements;
731   if(!p.isUnpacking()){
732     MemElementPacker packer(p);
733     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
734     packer.writeCheckpoint();
735   }
736 #endif
737 }
738
739 void CkMemCheckPT::startArrayCheckpoint(){
740 #if CMK_CHKP_ALL
741   int size;
742   {
743     PUP::sizer psizer;
744     pupAllElements(psizer);
745     size = psizer.size();
746   }
747   int packSize = size/sizeof(double)+1;
748   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
749   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
750   msg->len = size;
751   msg->cp_flag = 1;
752   int budPEs[2];
753   msg->bud1=CkMyPe();
754   msg->bud2=ChkptOnPe(CkMyPe());
755   {
756     PUP::toMem p(msg->packData);
757     pupAllElements(p);
758   }
759   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
760   if(chkpTable[0]) delete chkpTable[0];
761   chkpTable[0] = msg;
762   //send the checkpoint to my 
763   recvCount++;
764 #endif
765 }
766
767 void CkMemCheckPT::startCheckpoint(){
768 #if CMK_CONVERSE_MPI
769   int size;
770   {
771     PUP::sizer p;
772     _handleProcData(p,CmiFalse);
773     size = p.size();
774   }
775   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
776   procMsg->len = size;
777   procMsg->cp_flag = 1;
778   {
779     PUP::toMem p(procMsg->packData);
780     _handleProcData(p,CmiFalse);
781   }
782   int pointer = CpvAccess(curPointer);
783   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
784   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
785
786   {
787     PUP::sizer psizer;
788     pupAllElements(psizer);
789     size = psizer.size();
790   }
791   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
792   msg->len = size;
793   msg->cp_flag = 1;
794   int checksum;
795   {
796     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
797       PUP::checker p(msg->packData);
798       pupAllElements(p);
799       checksum = p.getChecksum();
800     }else{  
801 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
802       PUP::toMem p(msg->packData);
803       pupAllElements(p);
804     }
805   }
806   pointer = CpvAccess(curPointer);
807   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
808   CpvAccess(chkpBuf)[pointer] = msg;
809   
810   if(CkMyPe()==0)
811     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
812   
813   if(CkReplicaAlive()==1){
814     CpvAccess(recvdLocal) = 1;
815     if(CpvAccess(use_checksum)){
816       CpvAccess(localChecksum) = checksum;
817       //only one reaplica will send
818       if(CmiMyPartition()==0){
819         char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
820         *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
821         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
822         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
823       }
824     }else{
825       if(CmiMyPartition()==0){
826         envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
827         CkPackMessage(&env);
828         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
829         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
830       }
831     }
832     if(CmiMyPartition()==0){
833       notifyReplica = 1;
834       thisProxy[CkMyPe()].doneComparison(true);
835     }
836   }
837   if(CpvAccess(recvdRemote)==1){
838     //only partition 1 will do it
839     //compare the checkpoint 
840     int size = CpvAccess(chkpBuf)[pointer]->len;
841     if(CpvAccess(use_checksum)){
842       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
843         thisProxy[CkMyPe()].doneComparison(true);
844       }
845       else{
846         thisProxy[CkMyPe()].doneComparison(true);
847         //thisProxy[CkMyPe()].doneComparison(false);
848       }
849     }else{
850       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
851         thisProxy[CkMyPe()].doneComparison(true);
852       }
853       else{
854         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
855         thisProxy[CkMyPe()].doneComparison(true);
856         //thisProxy[CkMyPe()].doneComparison(false);
857       }
858       
859       if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
860     }
861     if(CkMyPe()==0)
862       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
863   }
864   else{
865     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
866       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
867       if(CpvAccess(remoteReady)==1){
868         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
869         {       
870           int pointer = CpvAccess(curPointer);
871           //send the proc data
872           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
873           procMsg->pointer = pointer;
874           envelope * env = (envelope *)(UsrToEnv(procMsg));
875           CkPackMessage(&env);
876           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
877           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
878           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
879             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
880         }
881         //send the array checkpoint data
882         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
883         msg->pointer = CpvAccess(curPointer);
884         envelope * env = (envelope *)(UsrToEnv(msg));
885         CkPackMessage(&env);
886         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
887         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
888         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
889           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
890       }else{
891         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
892           int pointer = CpvAccess(curPointer);
893           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
894           (CpvAccess(recoverProcBuf))->pointer = pointer;
895         }
896         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
897         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
898         CpvAccess(localReady) = 1;
899       }
900       //can continue work, no need to wait for my replica
901       int _ret = 1;
902       notifyReplica = 1;
903       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
904       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
905     }
906   }
907 #endif
908 }
909
910 void CkMemCheckPT::doneComparison(bool ret){
911   int _ret = 1;
912   if(!ret){
913     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
914     _ret = 0;
915   }else{
916     _ret = 1;
917   }
918   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
919   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
920 }
921
922 void CkMemCheckPT::doneRComparison(int ret){
923   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
924   if(ret==CkNumPes()&&!softFailureInjected){
925     CpvAccess(localChkpDone) = 1;
926     if(CpvAccess(remoteChkpDone) ==1){
927       thisProxy.doneBothComparison();
928     }else{
929       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
930     }
931   }
932   else{
933     ret = 0;
934     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
935     startTime = CmiWallTimer();
936     thisProxy.RollBack();
937   }
938   if(notifyReplica == 0){
939     //notify the replica am done
940     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
941     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
942     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
943     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
944     notifyReplica = 1;
945   }
946 }
947
948 void CkMemCheckPT::doneBothComparison(){
949   CpvAccess(recvdRemote) = 0;
950   CpvAccess(remoteReady) = 0;
951   CpvAccess(localReady) = 0;
952   CpvAccess(recvdLocal) = 0;
953   CpvAccess(localChkpDone) = 0;
954   CpvAccess(remoteChkpDone) = 0;
955   CpvAccess(remoteStarted) = 0;
956   CpvAccess(localStarted) = 0;
957   CpvAccess(_remoteCrashedNode) = -1;
958   CkMemCheckPT::replicaAlive = 1;
959   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
960   CpvAccess(curPointer)^=1;
961   inCheckpointing = 0;
962   notifyReplica = 0;
963   if(CkMyPe() == 0){
964     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
965   }
966   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
967 }
968
969 void CkMemCheckPT::RollBack(){
970   //restore group data
971   checkpointed = 0;
972   inRestarting = 1;
973   int pointer = CpvAccess(curPointer)^1;//use the previous one
974   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
975   PUP::fromMem p(chkpMsg->packData);    
976
977   //destroy array elements
978   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
979   int numGroups = CkpvAccess(_groupIDTable)->size();
980   for(int i=0;i<numGroups;i++) {
981     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
982     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
983     obj->flushStates();
984     obj->ckJustMigrated();
985   }
986   //restore array elements
987
988   int numElements;
989   p|numElements;
990
991   if(p.isUnpacking()){
992     for(int i=0;i<numElements;i++){
993       CkGroupID gID;
994       CkArrayIndex idx;
995       p|gID;
996       p|idx;
997       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
998       CmiAssert(mgr);
999       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1000     }
1001   }
1002   
1003   softFailureInjected = false;
1004   CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
1005   contribute(cb);
1006 }
1007
1008   void CkMemCheckPT::notifyReplicaDie(int pe){
1009     replicaAlive = 0;
1010     CpvAccess(_remoteCrashedNode) = pe;
1011   }
1012
1013   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
1014   {
1015 #if CMK_CHKP_ALL
1016     int idx = 1;
1017     if(msg->bud1 == CkMyPe()){
1018       idx = 0;
1019     }
1020     int isChkpting = msg->cp_flag;
1021     if(isChkpting == 1){
1022       if(chkpTable[idx]) delete chkpTable[idx];
1023     }
1024     chkpTable[idx] = msg;
1025     if(isChkpting){
1026       recvCount++;
1027       if(recvCount == 2){
1028         if (where == CkCheckPoint_inMEM) {
1029           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1030         }
1031         else if (where == CkCheckPoint_inDISK) {
1032           // another barrier for finalize the writing using fsync
1033           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1034           contribute(0,NULL,CkReduction::sum_int,localcb);
1035         }
1036         else
1037           CmiAbort("Unknown checkpoint scheme");
1038         recvCount = 0;
1039       }
1040     }
1041 #endif
1042   }
1043
1044   // don't handle array elements
1045   static inline void _handleProcData(PUP::er &p, CmiBool create)
1046   {
1047     // save readonlys, and callback BTW
1048     CkPupROData(p);
1049
1050     // save mainchares 
1051     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
1052
1053 #ifndef CMK_CHARE_USE_PTR
1054     // save non-migratable chare
1055     CkPupChareData(p);
1056 #endif
1057
1058     // save groups into Groups.dat
1059     CkPupGroupData(p,create);
1060
1061     // save nodegroups into NodeGroups.dat
1062     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
1063   }
1064
1065   void CkMemCheckPT::sendProcData()
1066   {
1067     // find out size of buffer
1068     int size;
1069     {
1070       PUP::sizer p;
1071       _handleProcData(p,CmiTrue);
1072       size = p.size();
1073     }
1074     int packSize = size;
1075     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1076     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1077     {
1078       PUP::toMem p(msg->packData);
1079       _handleProcData(p,CmiTrue);
1080     }
1081     msg->pe = CkMyPe();
1082     msg->len = size;
1083     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1084     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1085   }
1086
1087   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1088   {
1089     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1090     CpvAccess(procChkptBuf) = msg;
1091     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1092     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1093   }
1094
1095   // ArrayElement call this function to give us the checkpointed data
1096   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1097   {
1098     int len = ckTable.length();
1099     int idx;
1100     for (idx=0; idx<len; idx++) {
1101       CkCheckPTInfo *entry = ckTable[idx];
1102       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1103     }
1104     CkAssert(idx < len);
1105     int isChkpting = msg->cp_flag;
1106     ckTable[idx]->updateBuffer(msg);
1107     if (isChkpting) {
1108       // all my array elements have returned their inmem data
1109       // inform starter processor that I am done.
1110       recvCount ++;
1111       if (recvCount == ckTable.length()) {
1112         if (where == CkCheckPoint_inMEM) {
1113           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1114         }
1115         else if (where == CkCheckPoint_inDISK) {
1116           // another barrier for finalize the writing using fsync
1117           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1118           contribute(0,NULL,CkReduction::sum_int,localcb);
1119         }
1120         else
1121           CmiAbort("Unknown checkpoint scheme");
1122         recvCount = 0;
1123       } 
1124     }
1125   }
1126
1127   // only used in disk checkpointing
1128   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1129   {
1130     delete m;
1131 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1132     system("sync");
1133 #endif
1134     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1135   }
1136
1137   // only is called on cpStarter when checkpoint is done
1138   void CkMemCheckPT::cpFinish()
1139   {
1140     CmiAssert(CkMyPe() == cpStarter);
1141     peCount++;
1142     // now that all processors have finished, activate callback
1143     if (peCount == 2) 
1144     {
1145       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1146       peCount = 0;
1147       thisProxy.report();
1148     }
1149   }
1150
1151   // for debugging, report checkpoint info
1152   void CkMemCheckPT::report()
1153   {
1154     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1155     inCheckpointing = 0;
1156 #if !CMK_CHKP_ALL
1157     int objsize = 0;
1158     int len = ckTable.length();
1159     for (int i=0; i<len; i++) {
1160       CkCheckPTInfo *entry = ckTable[i];
1161       CmiAssert(entry);
1162       objsize += entry->getSize();
1163     }
1164     CmiAssert(CpvAccess(procChkptBuf));
1165     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1166 #else
1167     if(CkMyPe()==0)
1168       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1169 #endif
1170   }
1171
1172   /*****************************************************************************
1173     RESTART Procedure
1174    *****************************************************************************/
1175
1176   // master processor of two buddies
1177   inline int CkMemCheckPT::isMaster(int buddype)
1178   {
1179 #if 0
1180     int mype = CkMyPe();
1181     //CkPrintf("ismaster: %d %d\n", pe, mype);
1182     if (CkNumPes() - totalFailed() == 2) {
1183       return mype > buddype;
1184     }
1185     for (int i=1; i<CkNumPes(); i++) {
1186       int me = (buddype+i)%CkNumPes();
1187       if (isFailed(me)) continue;
1188       if (me == mype) return 1;
1189       else return 0;
1190     }
1191     return 0;
1192 #else
1193     // smaller one
1194     int mype = CkMyPe();
1195     //CkPrintf("ismaster: %d %d\n", pe, mype);
1196     if (CkNumPes() - totalFailed() == 2) {
1197       return mype < buddype;
1198     }
1199 #if NODE_CHECKPOINT
1200     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1201     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1202 #else
1203       for (int i=1; i<CkNumPes(); i++) {
1204 #endif
1205         int me = (mype+i)%CkNumPes();
1206         if (isFailed(me)) continue;
1207         if (me == buddype) return 1;
1208         else return 0;
1209       }
1210       return 0;
1211 #endif
1212     }
1213
1214
1215
1216 #if 0
1217     // helper class to pup all elements that belong to same ckLocMgr
1218     class ElementDestoryer : public CkLocIterator {
1219       private:
1220         CkLocMgr *locMgr;
1221       public:
1222         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1223         void addLocation(CkLocation &loc) {
1224           CkArrayIndex idx=loc.getIndex();
1225           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1226           loc.destroy();
1227         }
1228     };
1229 #endif
1230
1231     // restore the bitmap vector for LB
1232     void CkMemCheckPT::resetLB(int diepe)
1233     {
1234 #if CMK_LBDB_ON
1235       int i;
1236       char *bitmap = new char[CkNumPes()];
1237       // set processor available bitmap
1238       get_avail_vector(bitmap);
1239       for (i=0; i<failedPes.length(); i++)
1240         bitmap[failedPes[i]] = 0; 
1241       bitmap[diepe] = 0;
1242
1243 #if CK_NO_PROC_POOL
1244       set_avail_vector(bitmap);
1245 #endif
1246
1247       // if I am the crashed pe, rebuild my failedPEs array
1248       if (CkMyNode() == diepe)
1249         for (i=0; i<CkNumPes(); i++) 
1250           if (bitmap[i]==0) failed(i);
1251
1252       delete [] bitmap;
1253 #endif
1254     }
1255
1256     static double restartT;
1257
1258     // in case when failedPe dies, everybody go through its checkpoint table:
1259     // destory all array elements
1260     // recover lost buddies
1261     // reconstruct all array elements from check point data
1262     // called on all processors
1263     void CkMemCheckPT::restart(int diePe)
1264     {
1265 #if CMK_MEM_CHECKPOINT
1266       thisFailedPe = diePe;
1267       double curTime = CmiWallTimer();
1268       if (CkMyPe() == thisFailedPe){
1269         restartT = CmiWallTimer();
1270         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1271       }
1272       stage = (char*)"resetLB";
1273       startTime = curTime;
1274       if (CkMyPe() == thisFailedPe)
1275         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1276
1277 //#if CK_NO_PROC_POOL
1278 //      failed(diePe);  // add into the list of failed pes
1279 //#endif
1280
1281 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1282
1283       inRestarting = 1;
1284
1285       // disable load balancer's barrier
1286       //if (CkMyPe() != diePe) resetLB(diePe);
1287
1288       CKLOCMGR_LOOP(mgr->startInserting(););
1289
1290
1291       if(CmiNumPartition()==1){
1292         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1293       }else{
1294         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1295         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1296       }
1297 #endif
1298     }
1299
1300     // loally remove all array elements
1301     void CkMemCheckPT::removeArrayElements()
1302     {
1303 #if CMK_MEM_CHECKPOINT
1304       int len = ckTable.length();
1305       double curTime = CmiWallTimer();
1306       if (CkMyPe() == thisFailedPe) 
1307         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1308       stage = (char*)"removeArrayElements";
1309       startTime = curTime;
1310
1311       //  if (cpCallback.isInvalid()) 
1312       //          CkPrintf("invalid pe %d\n",CkMyPe());
1313       //          CkAbort("Didn't set restart callback\n");;
1314       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1315
1316       // get rid of all buffering and remote recs
1317       // including destorying all array elements
1318 #if CK_NO_PROC_POOL  
1319       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1320 #else
1321       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1322 #endif
1323       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1324 #endif
1325     }
1326
1327     // flush state in reduction manager
1328     void CkMemCheckPT::resetReductionMgr()
1329     {
1330       if (CkMyPe() == thisFailedPe) 
1331         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1332       stage = (char *)"resetReductionMgr";
1333       int numGroups = CkpvAccess(_groupIDTable)->size();
1334       for(int i=0;i<numGroups;i++) {
1335         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1336         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1337         obj->flushStates();
1338         obj->ckJustMigrated();
1339       }
1340
1341       //reset CmiReduce
1342       CmiResetReductions();
1343
1344       // reset again
1345       //CpvAccess(_qd)->flushStates();
1346       if(CmiNumPartition()==1){
1347         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1348       }
1349       else{
1350         if (CkMyPe() == thisFailedPe) 
1351           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1352         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1353       }
1354     }
1355
1356     // recover the lost buddies
1357     void CkMemCheckPT::recoverBuddies()
1358     {
1359       int idx;
1360       int len = ckTable.length();
1361       // ready to flush reduction manager
1362       // cannot be CkMemCheckPT::restart because destory will modify states
1363       double curTime = CmiWallTimer();
1364       if (CkMyPe() == thisFailedPe)
1365         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1366       stage = (char *)"recoverBuddies";
1367       if (CkMyPe() == thisFailedPe)
1368         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1369       startTime = curTime;
1370
1371       // recover buddies
1372       expectCount = 0;
1373 #if !CMK_CHKP_ALL
1374       for (idx=0; idx<len; idx++) {
1375         CkCheckPTInfo *entry = ckTable[idx];
1376         if (entry->pNo == thisFailedPe) {
1377 #if CK_NO_PROC_POOL
1378           // find a new buddy
1379           int budPe = BuddyPE(CkMyPe());
1380 #else
1381           int budPe = thisFailedPe;
1382 #endif
1383           CkArrayCheckPTMessage *msg = entry->getCopy();
1384           msg->bud1 = budPe;
1385           msg->bud2 = CkMyPe();
1386           msg->cp_flag = 0;            // not checkpointing
1387           thisProxy[budPe].recoverEntry(msg);
1388           expectCount ++;
1389         }
1390       }
1391 #else
1392       //send to failed pe
1393       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1394 #if CK_NO_PROC_POOL
1395         // find a new buddy
1396         int budPe = BuddyPE(CkMyPe());
1397 #else
1398         int budPe = thisFailedPe;
1399 #endif
1400         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1401         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1402         msg->cp_flag = 0;            // not checkpointing
1403         msg->bud1 = budPe;
1404         msg->bud2 = CkMyPe();
1405         thisProxy[budPe].recoverEntry(msg);
1406         expectCount ++;
1407       }
1408 #endif
1409
1410       if (expectCount == 0) {
1411         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1412       }
1413       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1414     }
1415
1416     void CkMemCheckPT::gotData()
1417     {
1418       ackCount ++;
1419       if (ackCount == expectCount) {
1420         ackCount = 0;
1421         expectCount = -1;
1422         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1423         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1424       }
1425     }
1426
1427     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1428     {
1429
1430       for (int i=0; i<n; i++) {
1431         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1432         mgr->updateLocation(idx[i], nowOnPe);
1433       }
1434       thisProxy[nowOnPe].gotReply();
1435     }
1436
1437     // restore array elements
1438     void CkMemCheckPT::recoverArrayElements()
1439     {
1440       double curTime = CmiWallTimer();
1441       int len = ckTable.length();
1442       if (CkMyPe() == thisFailedPe)
1443         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1444       stage = (char *)"recoverArrayElements";
1445       startTime = curTime;
1446       int flag = 0;
1447       // recover all array elements
1448       int count = 0;
1449
1450 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1451       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1452       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1453 #endif
1454
1455 #if !CMK_CHKP_ALL
1456       for (int idx=0; idx<len; idx++)
1457       {
1458         CkCheckPTInfo *entry = ckTable[idx];
1459 #if CK_NO_PROC_POOL
1460         // the bigger one will do 
1461         //    if (CkMyPe() < entry->pNo) continue;
1462         if (!isMaster(entry->pNo)) continue;
1463 #else
1464         // smaller one do it, which has the original object
1465         if (CkMyPe() == entry->pNo+1 || 
1466             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1467 #endif
1468         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1469
1470         entry->updateBuddy(CkMyPe(), entry->pNo);
1471         CkArrayCheckPTMessage *msg = entry->getCopy();
1472         // gzheng
1473         //thisProxy[CkMyPe()].inmem_restore(msg);
1474         inmem_restore(msg);
1475 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1476         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1477         int homePe = mgr->homePe(msg->index);
1478         if (homePe != CkMyPe()) {
1479           gmap[homePe].push_back(msg->locMgr);
1480           imap[homePe].push_back(msg->index);
1481         }
1482 #endif
1483         CkFreeMsg(msg);
1484         count ++;
1485       }
1486 #else
1487       char * packData;
1488       if(CmiNumPartition()==1){
1489         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1490         packData = (char *)msg->packData;
1491 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1492         recoverAll(packData,gmap,imap);
1493 #else
1494         recoverAll(packData);
1495 #endif
1496         CkFreeMsg(msg);
1497       }
1498       else{
1499         int pointer = CpvAccess(curPointer);
1500         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1501         packData = msg->packData;
1502 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1503         recoverAll(packData,gmap,imap);
1504 #else
1505         recoverAll(packData);
1506 #endif
1507         CkFreeMsg(msg);
1508       }
1509 #endif
1510 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1511       for (int i=0; i<CkNumPes(); i++) {
1512         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1513           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1514           flag++;       
1515         }
1516       }
1517       delete [] imap;
1518       delete [] gmap;
1519 #endif
1520       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1521       //if (CkMyPe() == thisFailedPe)
1522
1523       CKLOCMGR_LOOP(mgr->doneInserting(););
1524
1525       // _crashedNode = -1;
1526       CpvAccess(_crashedNode) = -1;
1527 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1528       if (CkMyPe() == 0)
1529         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1530 #else
1531       if(flag == 0)
1532       {
1533         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1534       }
1535 #endif
1536     }
1537
1538     void CkMemCheckPT::gotReply(){
1539       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1540     }
1541
1542     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1543 #if CMK_CHKP_ALL
1544       PUP::fromMem p(packData);
1545       int numElements;
1546       p|numElements;
1547       if(p.isUnpacking()){
1548         for(int i=0;i<numElements;i++){
1549           CkGroupID gID;
1550           CkArrayIndex idx;
1551           p|gID;
1552           p|idx;
1553           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1554           int homePe = mgr->homePe(idx);
1555 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1556           mgr->resume(idx,p,CmiTrue,CmiTrue);
1557 #else
1558           if(CmiNumPartition()==1)
1559             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1560           else{
1561             if(CkMyPe()==thisFailedPe){
1562               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1563             }
1564             else{
1565               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1566             }
1567           }
1568 #endif
1569 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1570           homePe = mgr->homePe(idx);
1571           if (homePe != CkMyPe()) {
1572             gmap[homePe].push_back(gID);
1573             imap[homePe].push_back(idx);
1574           }
1575 #endif
1576         }
1577       }
1578 #endif
1579     }
1580
1581
1582     // on every processor
1583     // turn load balancer back on
1584     void CkMemCheckPT::finishUp()
1585     {
1586       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1587       //CKLOCMGR_LOOP(mgr->doneInserting(););
1588
1589       if (CkMyPe() == thisFailedPe)
1590       {
1591         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1592         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1593         fflush(stdout);
1594       }
1595
1596
1597       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1598       inRestarting = 0;
1599       maxIter = -1;
1600 #if CMK_CONVERSE_MPI    
1601       if(CmiNumPartition()!=1){
1602         CpvAccess(recvdProcChkp) = 0;
1603         CpvAccess(recvdArrayChkp) = 0;
1604         CpvAccess(curPointer)^=1;
1605         //notify my replica, restart is done
1606         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1607           CpvAccess(remoteStarted) =0;
1608           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1609           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1610           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1611         }
1612       }
1613       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1614         lastPingTime = CmiWallTimer();
1615         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1616       }
1617       //inject next failure
1618       if(killFlag==2){
1619         if(CkMyPe()==0&&CmiMyPartition()==0)
1620           thisProxy[CkMyPe()].generateFailure();
1621       }
1622 #endif
1623
1624 #if CK_NO_PROC_POOL
1625 #if NODE_CHECKPOINT
1626       int numnodes = CmiNumPhysicalNodes();
1627 #else
1628       int numnodes = CkNumPes();
1629 #endif
1630       if (numnodes-totalFailed() <=2) {
1631         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1632         _memChkptOn = 0;
1633       }
1634 #endif
1635     }
1636
1637     void CkMemCheckPT::recoverFromSoftFailure()
1638     {
1639       inRestarting = 0;
1640       maxIter = -1;
1641       CpvAccess(recvdRemote) = 0;
1642       CpvAccess(recvdLocal) = 0;
1643       CpvAccess(localChkpDone) = 0;
1644       CpvAccess(remoteChkpDone) = 0;
1645       CpvAccess(remoteReady) = 0;
1646       CpvAccess(localReady) = 0;
1647       inCheckpointing = 0;
1648       notifyReplica = 0;
1649       CpvAccess(remoteStarted) = 0;
1650       CpvAccess(localStarted) = 0;
1651       CpvAccess(_remoteCrashedNode) = -1;
1652       CkMemCheckPT::replicaAlive = 1;
1653       inCheckpointing = 0;
1654       notifyReplica = 0;
1655       if(CkMyPe() == 0){
1656         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1657       }
1658       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1659       if(killFlag==2 && CmiMyPartition()==1 && CkMyPe()==0){
1660         thisProxy[CkMyPe()].generateSoftFailure();
1661       }
1662     }
1663     // called only on 0
1664     void CkMemCheckPT::quiescence(CkCallback &cb)
1665     {
1666       static int pe_count = 0;
1667       pe_count ++;
1668       CmiAssert(CkMyPe() == 0);
1669       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1670       if (pe_count == CkNumPes()) {
1671         pe_count = 0;
1672         cb.send();
1673       }
1674     }
1675
1676     // User callable function - to start a checkpoint
1677     // callback cb is used to pass control back
1678     void CkStartMemCheckpoint(CkCallback &cb)
1679     {
1680 #if CMK_MEM_CHECKPOINT
1681       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1682       /*if (_memChkptOn == 0) {
1683         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1684         cb.send();
1685         return;
1686         }*/
1687       if (CkInRestarting()) {
1688         // trying to checkpointing during restart
1689         cb.send();
1690         return;
1691       }
1692       if(CkInCheckpointing())
1693         return;
1694       // store user callback and user data
1695       CkMemCheckPT::cpCallback = cb;
1696
1697
1698       //send to my replica that checkpoint begins 
1699       if(CkReplicaAlive()==1){
1700         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1701         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1702         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1703       }
1704       CpvAccess(localStarted) = 1;
1705       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1706       checkptMgr.chkpLocalStart();
1707       // broadcast to start check pointing
1708       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1709         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1710         checkptMgr.doItNow(CkMyPe());
1711       }
1712 #else
1713       // when mem checkpoint is disabled, invike cb immediately
1714       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1715       cb.send();
1716 #endif
1717     }
1718
1719     void CkRestartCheckPoint(int diePe)
1720     {
1721       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1722       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1723       // broadcast
1724       checkptMgr.restart(diePe);
1725     }
1726
1727     static int _diePE = -1;
1728
1729     // callback function used locally by ccs handler
1730     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1731     {
1732       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1733       CkRestartCheckPoint(_diePE);
1734     }
1735
1736
1737     // called on crashed PE
1738     static void restartBeginHandler(char *msg)
1739     {
1740       CmiFree(msg);
1741 #if CMK_MEM_CHECKPOINT
1742 #if CMK_USE_BARRIER
1743       if(CkMyPe()!=_diePE){
1744         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1745         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1746         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1747         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1748       }else{
1749         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1750         CkRestartCheckPointCallback(NULL, NULL);
1751       }
1752 #else
1753       static int count = 0;
1754       CmiAssert(CkMyPe() == _diePE);
1755       count ++;
1756       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1757         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1758         CkRestartCheckPointCallback(NULL, NULL);
1759         count = 0;
1760       }
1761 #endif
1762 #endif
1763     }
1764
1765     extern void _discard_charm_message();
1766     extern void _resume_charm_message();
1767
1768     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1769       return data;
1770     }
1771
1772     static void restartBcastHandler(char *msg)
1773     {
1774 #if CMK_MEM_CHECKPOINT
1775       // advance phase counter
1776       CkMemCheckPT::inRestarting = 1;
1777       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1778       // gzheng
1779       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1780
1781       if (CkMyPe()==_diePE)
1782         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1783
1784       // reset QD counters
1785       /*  gzheng
1786           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1787        */
1788
1789       /*  gzheng
1790           if (CkMyPe()==_diePE)
1791           CkRestartCheckPointCallback(NULL, NULL);
1792        */
1793       CmiFree(msg);
1794
1795       _resume_charm_message();
1796
1797       // reduction
1798       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1799       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1800 #if CMK_USE_BARRIER
1801       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1802 #else
1803       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1804 #endif 
1805       checkpointed = 0;
1806 #endif
1807     }
1808
1809     extern void _initDone();
1810
1811     bool compare(char * buf1, char *buf2){
1812       if(CkMyPe()==0)
1813         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1814       PUP::checker pchecker(buf1,buf2);
1815       pchecker.skip();
1816       int numElements;
1817       pchecker|numElements;
1818       for(int i=0;i<numElements;i++){
1819         CkGroupID gID;
1820         CkArrayIndex idx;
1821
1822         pchecker|gID;
1823         pchecker|idx;
1824
1825         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1826         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1827       }
1828       if(CkMyPe()==0)
1829         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1830       //int fault_num = pchecker.getFaultNum();
1831       bool result = pchecker.getResult();
1832       /*if(!result){
1833         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1834       }*/
1835       return result;
1836     }
1837
1838     int getChecksum(char * buf){
1839       PUP::checker pchecker(buf);
1840       pchecker.skip();
1841       int numElements;
1842       pchecker|numElements;
1843 //      CkPrintf("num %d\n",numElements);
1844       for(int i=0;i<numElements;i++){
1845         CkGroupID gID;
1846         CkArrayIndex idx;
1847
1848         pchecker|gID;
1849         pchecker|idx;
1850
1851         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1852         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1853       }
1854       return pchecker.getChecksum();
1855     }
1856
1857     static void recvRemoteChkpHandler(char *msg){
1858       CpvAccess(remoteChkpDone) = 1;
1859       if(CpvAccess(use_checksum)){
1860         if(CkMyPe()==0)
1861           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1862         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1863         CpvAccess(recvdRemote) = 1;
1864         if(CpvAccess(recvdLocal)==1){
1865           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1866             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1867           }
1868           else{
1869             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1870             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1871           }
1872         }
1873         CmiFree(msg);
1874       }else{
1875         envelope *env = (envelope *)msg;
1876         CkUnpackMessage(&env);
1877         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1878         if(CpvAccess(recvdLocal)==1){
1879           int pointer = CpvAccess(curPointer);
1880           int size = CpvAccess(chkpBuf)[pointer]->len;
1881           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1882             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1883           }else
1884           {
1885             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1886             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1887           }
1888           CmiFree(msg);
1889           if(CkMyPe()==0)
1890             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1891         }else{
1892           CpvAccess(recvdRemote) = 1;
1893           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1894           CpvAccess(buddyBuf) = chkpMsg;
1895         }
1896       }
1897     }
1898
1899     static void replicaRecoverHandler(char *msg){
1900       //fflush(stdout);
1901       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1902       CpvAccess(remoteChkpDone) = 1;
1903       if(CpvAccess(localChkpDone) == 1)
1904         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1905       else{  
1906         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1907         //CmiAbort("impossible state");
1908       }
1909       CmiFree(msg);
1910     }
1911
1912     
1913     static void replicaBeginFailureInjectionHandler(char * msg){
1914       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1915       checkptMgr.generateFailure();
1916       CmiFree(msg);
1917     }
1918     
1919     static void replicaChkpDoneHandler(char *msg){
1920       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1921       CpvAccess(remoteChkpDone) = 1;
1922       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1923       if(CpvAccess(localChkpDone) == 1)
1924         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1925       else  
1926         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1927       CmiFree(msg);
1928     }
1929
1930     static void replicaDieHandler(char * msg){
1931 #if CMK_CONVERSE_MPI
1932       //broadcast to every one in my replica
1933       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1934       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1935 #endif
1936     }
1937
1938     static void replicaChkpStartHandler(char * msg){
1939       if(CkMyPe()==0){
1940         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1941       }
1942       CpvAccess(remoteStarted) =1;
1943       if(CpvAccess(localStarted)==1){    
1944         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1945         checkptMgr.doItNow(0);
1946       }
1947     }
1948
1949     static void replicaDieBcastHandler(char *msg){
1950       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1951       CpvAccess(_remoteCrashedNode) = diePe;
1952       if(CkMyPe()==diePe){
1953         CmiPrintf("pe %d in replica world die\n",diePe);
1954         fflush(stdout);
1955       }
1956       
1957       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1958         find_spare_mpirank(diePe,CmiMyPartition()^1);
1959       }
1960
1961       //what if am already ready to checkpoint
1962       if(CpvAccess(resilience)!=1){
1963         CkMemCheckPT::replicaAlive = 0;
1964         if(CpvAccess(localStarted)==1){    
1965           if(CkMyPe()==0){
1966             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1967             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1968             checkptMgr.doItNow(0);
1969           }
1970         }
1971         //broadcast to my partition to get local max iter
1972         else{
1973           if(CkMyPe()==diePe){
1974             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
1975           }
1976           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1977         }
1978       }
1979
1980       CmiFree(msg);
1981     }
1982
1983     static void recoverRemoteProcDataHandler(char *msg){
1984       envelope *env = (envelope *)msg;
1985       CkUnpackMessage(&env);
1986       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1987
1988       //store the checkpoint
1989       int pointer = procMsg->pointer;
1990
1991
1992       if(CkMyPe()==CpvAccess(_crashedNode)){
1993         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1994         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1995         PUP::fromMem p(procMsg->packData);
1996         _handleProcData(p,CmiTrue);
1997         _initDone();
1998         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
1999       }
2000       else{
2001         //shud never happen
2002         CmiAbort("non crashed pe received proc data");
2003         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
2004         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
2005         //_handleProcData(p,CmiFalse);
2006       }
2007       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2008       CKLOCMGR_LOOP(mgr->startInserting(););
2009
2010       CpvAccess(recvdProcChkp) =1;
2011       if(CpvAccess(recvdArrayChkp)==1){
2012         _resume_charm_message();
2013         _diePE = CpvAccess(_crashedNode);
2014         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2015         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2016         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2017 #if CMK_USE_BARRIER
2018         if(CpvAccess(resilience)==1){
2019           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2020         }else
2021           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2022 #else
2023         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2024 #endif 
2025       }
2026     }
2027
2028     static void recoverRemoteArrayDataHandler(char *msg){
2029       envelope *env = (envelope *)msg;
2030       CkUnpackMessage(&env);
2031       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
2032
2033       //store the checkpoint
2034       int pointer = chkpMsg->pointer;
2035       CpvAccess(curPointer) = pointer;
2036       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
2037       CpvAccess(chkpBuf)[pointer] = chkpMsg;
2038       CpvAccess(recvdArrayChkp) =1;
2039       CkMemCheckPT::inRestarting = 1;
2040
2041       if(CkMyPe()==CpvAccess(_crashedNode))
2042         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
2043       
2044       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
2045         _resume_charm_message();
2046         _diePE = CpvAccess(_crashedNode);
2047         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
2048         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2049         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2050         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2051 #if CMK_USE_BARRIER
2052         if(CpvAccess(resilience)==1){
2053           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2054         }else
2055           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2056 #else
2057         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2058 #endif 
2059       }
2060     }
2061
2062     static void recvPhaseHandler(char * msg)
2063     {
2064       //decrease the phase number so that the crashed replica can communicate with the healthy one
2065       CpvAccess(_curRestartPhase)--;
2066       
2067       CkMemCheckPT::inRestarting = 1;
2068       CmiFree(msg);
2069       //notify the buddy in the replica now i can receive the checkpoint msg
2070       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
2071         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2072         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
2073         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
2074         //timer start
2075         if(CpvAccess(_crashedNode)==CkMyPe()){
2076           CkMemCheckPT::startTime = restartT = CmiWallTimer();
2077           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
2078         }
2079       }else{
2080         CpvAccess(curPointer)^=1;
2081         CkMemCheckPT::inRestarting = 1;
2082         _resume_charm_message();
2083         _diePE = CpvAccess(_crashedNode);
2084       }
2085     }
2086     
2087     static void askRecoverDataHandler(char * msg){
2088       CmiFree(msg);
2089       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2090         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
2091       if(CpvAccess(resilience)!=1){
2092         CpvAccess(remoteReady)=1;
2093         if(CpvAccess(localReady)==1){
2094           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2095           {     
2096             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
2097             CkPackMessage(&env);
2098             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2099             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2100             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2101           }
2102           //send the array checkpoint data
2103           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2104           CkPackMessage(&env);
2105           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2106           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2107           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2108             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2109         }
2110       }else{
2111         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2112         int pointer = CpvAccess(curPointer)^1;
2113         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2114         procMsg->pointer = pointer;
2115         envelope * env = (envelope *)(UsrToEnv(procMsg));
2116         CkPackMessage(&env);
2117         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2118         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2119         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2120         
2121         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2122         arrayMsg->pointer = pointer;
2123         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2124         CkPackMessage(&env1);
2125         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2126         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2127         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2128       }
2129     }
2130     // called on crashed processor
2131     static void recoverProcDataHandler(char *msg)
2132     {
2133 #if CMK_MEM_CHECKPOINT
2134       int i;
2135       envelope *env = (envelope *)msg;
2136       CkUnpackMessage(&env);
2137       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2138       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2139       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2140       PUP::fromMem p(procMsg->packData);
2141       _handleProcData(p,CmiTrue);
2142
2143       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2144       // gzheng
2145       CKLOCMGR_LOOP(mgr->startInserting(););
2146
2147       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2148       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2149       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2150       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2151
2152       _initDone();
2153       //   CpvAccess(_qd)->flushStates();
2154       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2155 #endif
2156     }
2157     //for replica, got the phase number from my neighbor processor in the current partition
2158     static void askPhaseHandler(char *msg)
2159     {
2160 #if CMK_MEM_CHECKPOINT
2161       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2162       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2163       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2164       CmiSetHandler(msg, recvPhaseHandlerIdx);
2165       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2166 #endif
2167     }
2168     // called on its backup processor
2169     // get backup message buffer and sent to crashed processor
2170     static void askProcDataHandler(char *msg)
2171     {
2172 #if CMK_MEM_CHECKPOINT
2173       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2174       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2175       if (CpvAccess(procChkptBuf) == NULL)  {
2176         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2177         CkAbort("no checkpoint found");
2178       }
2179       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2180       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2181
2182       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2183
2184       CkPackMessage(&env);
2185       CmiSetHandler(env, recoverProcDataHandlerIdx);
2186       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2187       CpvAccess(procChkptBuf) = NULL;
2188       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2189 #endif
2190     }
2191
2192     // called on PE 0
2193     void qd_callback(void *m)
2194     {
2195       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2196       fflush(stdout);
2197       CkFreeMsg(m);
2198       if(CmiNumPartition()==1){
2199 #ifdef CMK_SMP
2200         for(int i=0;i<CmiMyNodeSize();i++){
2201           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2202           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2203           CmiSetHandler(msg, askProcDataHandlerIdx);
2204           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2205           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2206         }
2207         return;
2208 #endif
2209         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2210         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2211         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2212         CmiSetHandler(msg, askProcDataHandlerIdx);
2213         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2214         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2215       }
2216       else{
2217         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2218         CmiSetHandler(msg, recvPhaseHandlerIdx);
2219         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2220       }
2221     }
2222
2223     // on crashed node
2224     void CkMemRestart(const char *dummy, CkArgMsg *args)
2225     {
2226 #if CMK_MEM_CHECKPOINT
2227       _diePE = CmiMyNode();
2228       CpvAccess( _crashedNode )= CmiMyNode();
2229       CkMemCheckPT::inRestarting = 1;
2230       _discard_charm_message();
2231
2232       /*if(CmiMyRank()==0){
2233         CkCallback cb(qd_callback);
2234         CkStartQD(cb);
2235         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2236         }*/
2237       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2238       if(CmiNumPartition()==1){
2239         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2240         restartT = CmiWallTimer();
2241         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2242         fflush(stdout);
2243         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2244         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2245         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2246         CmiSetHandler(msg, askProcDataHandlerIdx);
2247         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2248         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2249       }
2250       else{
2251         CkCallback cb(qd_callback);
2252         CkStartQD(cb);
2253       }
2254 #else
2255       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2256 #endif
2257     }
2258
2259     // can be called in other files
2260     // return true if it is in restarting
2261     extern "C"
2262       int CkInRestarting()
2263       {
2264 #if CMK_MEM_CHECKPOINT
2265         if (CpvAccess( _crashedNode)!=-1) return 1;
2266         // gzheng
2267         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2268         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2269         return CkMemCheckPT::inRestarting;
2270 #else
2271         return 0;
2272 #endif
2273       }
2274
2275     extern "C"
2276       int CkReplicaAlive()
2277       {
2278         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2279         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2280         return CkMemCheckPT::replicaAlive;
2281
2282         /*if(CkMemCheckPT::replicaDead==1)
2283           return 0;
2284           else          
2285           return 1;*/
2286       }
2287
2288     extern "C"
2289       int CkInCheckpointing()
2290       {
2291         return CkMemCheckPT::inCheckpointing;
2292       }
2293
2294     extern "C"
2295       void CkSetInLdb(){
2296 #if CMK_MEM_CHECKPOINT
2297         CkMemCheckPT::inLoadbalancing = 1;
2298 #endif
2299       }
2300
2301     extern "C"
2302       int CkInLdb(){
2303 #if CMK_MEM_CHECKPOINT
2304         return CkMemCheckPT::inLoadbalancing;
2305 #endif
2306         return 0;
2307       }
2308
2309     extern "C"
2310       void CkResetInLdb(){
2311 #if CMK_MEM_CHECKPOINT
2312         CkMemCheckPT::inLoadbalancing = 0;
2313 #endif
2314       }
2315
2316     /*****************************************************************************
2317       module initialization
2318      *****************************************************************************/
2319
2320     static int arg_where = CkCheckPoint_inMEM;
2321
2322 #if CMK_MEM_CHECKPOINT
2323     void init_memcheckpt(char **argv)
2324     {
2325       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2326         arg_where = CkCheckPoint_inDISK;
2327       }
2328       CpvInitialize(int, use_checksum);
2329       CpvInitialize(int, resilience);
2330       CpvAccess(use_checksum)=0;
2331       CpvAccess(resilience)=0;//TODO should set a default resilience level
2332       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2333         CpvAccess(use_checksum)=1;
2334       }
2335       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2336         CpvAccess(resilience)=1;
2337       }
2338       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2339         CpvAccess(resilience)=2;
2340       }
2341       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2342         CpvAccess(resilience)=3;
2343       }
2344       // initiliazing _crashedNode variable
2345       CpvInitialize(int, _crashedNode);
2346       CpvInitialize(int, _remoteCrashedNode);
2347       CpvAccess(_crashedNode) = -1;
2348       CpvAccess(_remoteCrashedNode) = -1;
2349       init_FI(argv);
2350     }
2351 #endif
2352
2353     class CkMemCheckPTInit: public Chare {
2354       public:
2355         CkMemCheckPTInit(CkArgMsg *m) {
2356 #if CMK_MEM_CHECKPOINT
2357           if (arg_where == CkCheckPoint_inDISK) {
2358             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2359           }
2360
2361           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2362           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2363 #endif
2364         }
2365     };
2366
2367     static void notifyHandler(char *msg)
2368     {
2369 #if CMK_MEM_CHECKPOINT
2370       CmiFree(msg);
2371       /* immediately increase restart phase to filter old messages */
2372       CpvAccess(_curRestartPhase) ++;
2373       CpvAccess(_qd)->flushStates();
2374       _discard_charm_message();
2375
2376 #endif
2377     }
2378
2379     extern "C"
2380       void notify_crash(int node)
2381       {
2382 #ifdef CMK_MEM_CHECKPOINT
2383         CpvAccess( _crashedNode) = node;
2384 #ifdef CMK_SMP
2385         for(int i=0;i<CkMyNodeSize();i++){
2386           CpvAccessOther(_crashedNode,i)=node;
2387         }
2388 #endif
2389         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2390         CkMemCheckPT::inRestarting = 1;
2391
2392         // this may be in interrupt handler, send a message to reset QD
2393         int pe = CmiNodeFirst(CkMyNode());
2394         for(int i=0;i<CkMyNodeSize();i++){
2395           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2396           CmiSetHandler(msg, notifyHandlerIdx);
2397           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2398         }
2399 #endif
2400       }
2401
2402     extern "C" void (*notify_crash_fn)(int node);
2403
2404
2405 #if CMK_CONVERSE_MPI
2406     void buddyDieHandler(char *msg)
2407     {
2408 #if CMK_MEM_CHECKPOINT
2409       // notify
2410       CkMemCheckPT::inRestarting = 1;
2411       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2412       notify_crash(diepe);
2413       // send message to crash pe to let it restart
2414       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2415       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2416       int buddy = obj->BuddyPE(CmiMyPe());
2417       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2418       if (buddy == diepe)  {
2419         mpi_restart_crashed(diepe, newrank);
2420       }
2421 #endif
2422     }
2423
2424     void pingHandler(void *msg)
2425     {
2426       lastPingTime = CmiWallTimer();
2427       CmiFree(msg);
2428     }
2429
2430     void pingCheckHandler()
2431     {
2432 #if CMK_MEM_CHECKPOINT
2433       double now = CmiWallTimer();
2434       if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2435         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2436         int i, pe, buddy;
2437         // tell everyone the buddy dies
2438         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2439         for (i = 1; i < CmiNumPes(); i++) {
2440           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2441           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2442         }
2443         buddy = pe;
2444         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2445         fflush(stdout);
2446         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2447         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2448         CmiSetHandler(msg, buddyDieHandlerIdx);
2449         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2450         //send to everyone in the other world
2451         if(CmiNumPartition()!=1){
2452           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2453           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2454           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2455           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2456         }
2457       }
2458         else 
2459           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2460 #endif
2461       }
2462
2463       void pingBuddy()
2464       {
2465 #if CMK_MEM_CHECKPOINT
2466         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2467         if (obj) {
2468           int buddy = obj->BuddyPE(CkMyPe());
2469           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2470           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2471           CmiSetHandler(msg, pingHandlerIdx);
2472           CmiGetRestartPhase(msg) = 9999;
2473           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2474         }
2475         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2476 #endif
2477       }
2478 #endif
2479
2480       // initproc
2481       void CkRegisterRestartHandler( )
2482       {
2483 #if CMK_MEM_CHECKPOINT
2484         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2485         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2486         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2487         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2488         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2489         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2490
2491         //for replica
2492         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2493         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2494         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2495         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2496         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2497         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2498         replicaBeginFailureInjectionHandlerIdx = CkRegisterHandler((CmiHandler)replicaBeginFailureInjectionHandler);
2499         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2500         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2501         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2502         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2503
2504 #if CMK_CONVERSE_MPI
2505         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2506         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2507         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2508 #endif
2509
2510         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2511         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2512         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2513         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2514         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2515         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2516         CpvInitialize(int,curPointer);
2517         CpvInitialize(int,recvdLocal);
2518         CpvInitialize(int,localChkpDone);
2519         CpvInitialize(int,remoteChkpDone);
2520         CpvInitialize(int,remoteStarted);
2521         CpvInitialize(int,localStarted);
2522         CpvInitialize(int,localReady);
2523         CpvInitialize(int,remoteReady);
2524         CpvInitialize(int,recvdRemote);
2525         CpvInitialize(int,recvdProcChkp);
2526         CpvInitialize(int,localChecksum);
2527         CpvInitialize(int,remoteChecksum);
2528         CpvInitialize(int,recvdArrayChkp);
2529
2530         CpvAccess(procChkptBuf) = NULL;
2531         CpvAccess(buddyBuf) = NULL;
2532         CpvAccess(recoverProcBuf) = NULL;
2533         CpvAccess(recoverArrayBuf) = NULL;
2534         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2535         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2536         CpvAccess(chkpBuf)[0] = NULL;
2537         CpvAccess(chkpBuf)[1] = NULL;
2538         CpvAccess(localProcChkpBuf)[0] = NULL;
2539         CpvAccess(localProcChkpBuf)[1] = NULL;
2540
2541         CpvAccess(curPointer) = 0;
2542         CpvAccess(recvdLocal) = 0;
2543         CpvAccess(localChkpDone) = 0;
2544         CpvAccess(remoteChkpDone) = 0;
2545         CpvAccess(remoteStarted) = 0;
2546         CpvAccess(localStarted) = 0;
2547         CpvAccess(localReady) = 0;
2548         CpvAccess(remoteReady) = 0;
2549         CpvAccess(recvdRemote) = 0;
2550         CpvAccess(recvdProcChkp) = 0;
2551         CpvAccess(localChecksum) = 0;
2552         CpvAccess(remoteChecksum) = 0;
2553         CpvAccess(recvdArrayChkp) = 0;
2554
2555         notify_crash_fn = notify_crash;
2556
2557 #if ! CMK_CONVERSE_MPI
2558         // print pid to kill
2559         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2560         //  sleep(4);
2561 #endif
2562 #endif
2563       }
2564
2565
2566       extern "C"
2567         int CkHasCheckpoints()
2568         {
2569           return checkpointed;
2570         }
2571
2572       /// @todo: the following definitions should be moved to a separate file containing
2573       // structures and functions about fault tolerance strategies
2574
2575       /**
2576        *  * @brief: function for killing a process                                             
2577        *   */
2578 #ifdef CMK_MEM_CHECKPOINT
2579 #if CMK_HAS_GETPID
2580       void killLocal(void *_dummy,double curWallTime){
2581         if(CmiWallTimer()<killTime-1){
2582           printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2583           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2584         }else{ 
2585 #if CMK_CONVERSE_MPI
2586           if(!CkInCheckpointing()&&!CkInRestarting()){
2587             printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2588             CkDieNow();
2589           }else{
2590             if(killFlag == 2){
2591               //delay for 2s 
2592               CcdCallFnAfter(killLocal,NULL,2*1000);        
2593             }
2594           }
2595 #else 
2596           kill(getpid(),SIGKILL);                                               
2597 #endif
2598         }              
2599       } 
2600 #else
2601       void killLocal(void *_dummy,double curWallTime){
2602         CmiAbort("kill() not supported!");
2603       }
2604 #endif
2605       void injectSoftFailure(void *_dummy,double curWallTime){
2606         if(!CkInCheckpointing()&&!CkInRestarting()){
2607           CkPrintf("soft failure injected\n");
2608           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->softFailureInjected = true;
2609         }else{
2610           //next failure
2611           CcdCallFnAfter(injectSoftFailure,NULL,2*1000);
2612         }
2613       }
2614 #endif
2615
2616 #ifdef CMK_MEM_CHECKPOINT
2617       /**
2618        * @brief: reads the file with the kill information
2619        */
2620       void readKillFile(){
2621         FILE *fp=fopen(killFile,"r");
2622         if(!fp){
2623           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2624           return;
2625         }
2626         int proc;
2627         double sec;
2628         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2629           if(proc == CkMyNode() && CkMyRank() == 0){
2630             killTime = CmiWallTimer()+sec;
2631             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2632             CcdCallFnAfter(killLocal,NULL,sec*1000);
2633           }
2634         }
2635         fclose(fp);
2636       }
2637
2638
2639 #if ! CMK_CONVERSE_MPI
2640       void CkDieNow()
2641       {
2642 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2643         // ignored for non-mpi version
2644         CmiPrintf("[%d] die now.\n", CmiMyPe());
2645         killTime = CmiWallTimer()+0.001;
2646         CcdCallFnAfter(killLocal,NULL,1);
2647 #endif
2648       }
2649 #endif
2650
2651 #endif
2652
2653 #include "CkMemCheckpoint.def.h"
2654
2655