use cmireduce during recovery
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   if(CkMyPe()==0){
440     CkPrintf("local max iter is %d\n",localMaxIter);
441   }
442   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
443   int elemCount = CkCountChkpSyncElements();
444   if(CkMyPe()==0)
445     startTime = CmiWallTimer();
446   if(elemCount == 0){
447     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448   }
449 }
450
451 //when one replica failes, decide the next chkp iter
452
453 void CkMemCheckPT::recvIter(int iter){
454   if(maxIter<iter){
455     maxIter=iter;
456   }
457 }
458
459 void CkMemCheckPT::recvMaxIter(int iter){
460   localDecided = false;
461   if(CkMyPe()==0)
462     CkPrintf("checkpoint iteration is %d\n",iter);
463   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
464 }
465
466 void CkMemCheckPT::reachChkpIter(){
467   recvIterCount++;
468   elemCount = CkCountChkpSyncElements();
469   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
470   if(recvIterCount == elemCount){
471     recvIterCount = 0;
472     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
473   }
474 }
475
476 void CkMemCheckPT::startChkp(){
477   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
478   CkStartMemCheckpoint(cpCallback);
479 }
480
481 // called by checkpoint mgr to restore an array element
482 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
483 {
484 #if CMK_MEM_CHECKPOINT
485   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
486   // m->index.print();
487   PUP::fromMem p(m->packData);
488   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
489   CmiAssert(mgr);
490 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
491   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
492 #else
493   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
494 #endif
495
496   // find a list of array elements bound together
497   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
498   CmiAssert(elt);
499   CkLocRec_local *rec = elt->myRec;
500   CkVec<CkMigratable *> list;
501   mgr->migratableList(rec, list);
502   CmiAssert(list.length() > 0);
503   for (int l=0; l<list.length(); l++) {
504     elt = (ArrayElement *)list[l];
505     elt->budPEs[0] = m->bud1;
506     elt->budPEs[1] = m->bud2;
507     //    reset, may not needed now
508     // for now.
509     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
510       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
511       if (c) c->redNo = 0;
512     }
513   }
514 #endif
515 }
516
517 // return 1 if pe is a crashed processor
518 int CkMemCheckPT::isFailed(int pe)
519 {
520   for (int i=0; i<failedPes.length(); i++)
521     if (failedPes[i] == pe) return 1;
522   return 0;
523 }
524
525 // add pe into history list of all failed processors
526 void CkMemCheckPT::failed(int pe)
527 {
528   if (isFailed(pe)) return;
529   failedPes.push_back(pe);
530 }
531
532 int CkMemCheckPT::totalFailed()
533 {
534   return failedPes.length();
535 }
536
537 // create an checkpoint entry for array element of aid with index.
538 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
539 {
540   // error check, no duplicate
541   int idx, len = ckTable.size();
542   for (idx=0; idx<len; idx++) {
543     CkCheckPTInfo *entry = ckTable[idx];
544     if (index == entry->index) {
545       if (loc == entry->locMgr) {
546         // bindTo array elements
547         return;
548       }
549       // for array inheritance, the following check may fail
550       // because ArrayElement constructor of all superclasses are called
551       if (aid == entry->aid) {
552         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
553         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
554       }
555     }
556   }
557   CkCheckPTInfo *newEntry;
558   if (where == CkCheckPoint_inMEM)
559     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
560   else
561     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
562   ckTable.push_back(newEntry);
563   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
564 }
565
566 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
567 {
568 #if !CMK_CHKP_ALL       
569   int buddy = msg->bud1;
570   if (buddy == CkMyPe()) buddy = msg->bud2;
571   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
572   recvData(msg);
573   // ack
574   thisProxy[buddy].gotData();
575 #else
576   chkpTable[0] = NULL;
577   chkpTable[1] = NULL;
578   recvArrayCheckpoint(msg);
579   thisProxy[msg->bud2].gotData();
580 #endif
581 }
582
583 // loop through my checkpoint table and ask checkpointed array elements
584 // to send me checkpoint data.
585 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
586 void CkMemCheckPT::doItNow(int starter)
587 {
588   checkpointed = 1;
589   //cpCallback = cb;
590   cpStarter = starter;
591   inCheckpointing = 1;
592   if (CkMyPe() == cpStarter) {
593     startTime = CmiWallTimer();
594     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
595   }
596 #if CMK_CONVERSE_MPI
597   if(CmiNumPartition()==1)
598 #endif    
599   {
600
601 #if !CMK_CHKP_ALL
602     int len = ckTable.length();
603     for (int i=0; i<len; i++) {
604       CkCheckPTInfo *entry = ckTable[i];
605       // always let the bigger number processor send request
606       //if (CkMyPe() < entry->pNo) continue;
607       // always let the smaller number processor send request, may on same proc
608       if (!isMaster(entry->pNo)) continue;
609       // call inmem_checkpoint to the array element, ask it to send
610       // back checkpoint data via recvData().
611       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
612       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
613     }
614     // if my table is empty, then I am done
615     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
616 #else
617     startArrayCheckpoint();
618 #endif
619     sendProcData();
620   }
621 #if CMK_CONVERSE_MPI
622   else
623   {
624     startCheckpoint();
625   }
626 #endif
627   // pack and send proc level data
628 }
629
630 class MemElementPacker : public CkLocIterator{
631   private:
632     //    CkLocMgr *locMgr;
633     PUP::er &p;
634     std::map<CkHashCode,CkLocation> arrayMap;
635   public:
636     MemElementPacker(PUP::er &p_):p(p_){};
637     void addLocation(CkLocation &loc){
638       CkArrayIndexMax idx = loc.getIndex();
639       arrayMap[idx.hash()] = loc;
640     }
641     void writeCheckpoint(){
642       std::map<CkHashCode, CkLocation>::iterator it;
643       for(it = arrayMap.begin();it!=arrayMap.end();it++){
644         CkLocation loc = it->second;
645         CkLocMgr *locMgr = loc.getManager();
646         CkArrayIndexMax idx = loc.getIndex();
647         CkGroupID gID = locMgr->ckGetGroupID();
648         p|gID;
649         p|idx;
650         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
651       }
652     }
653 };
654
655 void pupAllElements(PUP::er &p){
656 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
657   int numElements;
658   if(!p.isUnpacking()){
659     numElements = CkCountArrayElements();
660   }
661   p | numElements;
662   if(!p.isUnpacking()){
663     MemElementPacker packer(p);
664     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
665     packer.writeCheckpoint();
666   }
667 #endif
668 }
669
670 void CkMemCheckPT::startArrayCheckpoint(){
671 #if CMK_CHKP_ALL
672   int size;
673   {
674     PUP::sizer psizer;
675     pupAllElements(psizer);
676     size = psizer.size();
677   }
678   int packSize = size/sizeof(double)+1;
679   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
680   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
681   msg->len = size;
682   msg->cp_flag = 1;
683   int budPEs[2];
684   msg->bud1=CkMyPe();
685   msg->bud2=ChkptOnPe(CkMyPe());
686   {
687     PUP::toMem p(msg->packData);
688     pupAllElements(p);
689   }
690   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
691   if(chkpTable[0]) delete chkpTable[0];
692   chkpTable[0] = msg;
693   //send the checkpoint to my 
694   recvCount++;
695 #endif
696 }
697
698 void CkMemCheckPT::startCheckpoint(){
699 #if CMK_CONVERSE_MPI
700   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
701     CkPrintf("in start checkpointing!!!!\n");
702   int size;
703   {
704     PUP::sizer p;
705     _handleProcData(p,CmiFalse);
706     size = p.size();
707   }
708   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
709   procMsg->len = size;
710   procMsg->cp_flag = 1;
711   {
712     PUP::toMem p(procMsg->packData);
713     _handleProcData(p,CmiFalse);
714   }
715   int pointer = CpvAccess(curPointer);
716   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
717   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
718
719   {
720     PUP::sizer psizer;
721     pupAllElements(psizer);
722     size = psizer.size();
723   }
724   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
725   msg->len = size;
726   msg->cp_flag = 1;
727   int checksum;
728   {
729     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
730       PUP::checker p(msg->packData);
731       pupAllElements(p);
732       checksum = p.getChecksum();
733     }else{  
734 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
735       PUP::toMem p(msg->packData);
736       pupAllElements(p);
737     }
738   }
739   pointer = CpvAccess(curPointer);
740   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
741   CpvAccess(chkpBuf)[pointer] = msg;
742   if(CkMyPe()==0)
743     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
744   if(CkReplicaAlive()==1){
745     CpvAccess(recvdLocal) = 1;
746     if(CpvAccess(use_checksum)){
747       CpvAccess(localChecksum) = checksum;
748       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
749       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
750       //only one reaplica will send
751       if(CmiMyPartition()==0){
752         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
753         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
754       }
755     }else{
756       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
757       CkPackMessage(&env);
758       if(CmiMyPartition()==0){
759         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
760         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
761       }
762     }
763     if(CmiMyPartition()==0){
764       notifyReplica = 1;
765       thisProxy[CkMyPe()].doneComparison(true);
766     }
767   }
768   if(CpvAccess(recvdRemote)==1){
769     //only partition 1 will do it
770     //compare the checkpoint 
771     int size = CpvAccess(chkpBuf)[pointer]->len;
772     if(CpvAccess(use_checksum)){
773       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
774         thisProxy[CkMyPe()].doneComparison(true);
775       }
776       else{
777         thisProxy[CkMyPe()].doneComparison(false);
778       }
779     }else{
780       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
781         thisProxy[CkMyPe()].doneComparison(true);
782       }
783       else{
784         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
785         thisProxy[CkMyPe()].doneComparison(false);
786       }
787     }
788     if(CkMyPe()==0)
789       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
790   }
791   else{
792     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
793       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
794       if(CpvAccess(remoteReady)==1){
795         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
796         {       
797           int pointer = CpvAccess(curPointer);
798           //send the proc data
799           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
800           procMsg->pointer = pointer;
801           envelope * env = (envelope *)(UsrToEnv(procMsg));
802           CkPackMessage(&env);
803           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
804           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
805           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
806             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
807           }
808         }
809         //send the array checkpoint data
810         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
811         msg->pointer = CpvAccess(curPointer);
812         envelope * env = (envelope *)(UsrToEnv(msg));
813         CkPackMessage(&env);
814         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
815         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
816         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
817           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
818       }else{
819         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
820           int pointer = CpvAccess(curPointer);
821           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
822           (CpvAccess(recoverProcBuf))->pointer = pointer;
823         }
824         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
825         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
826         CpvAccess(localReady) = 1;
827       }
828       //can continue work, no need to wait for my replica
829       int _ret = 1;
830       notifyReplica = 1;
831       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
832       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
833     }
834   }
835 #endif
836 }
837
838 void CkMemCheckPT::doneComparison(bool ret){
839   int _ret = 1;
840   if(!ret){
841     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
842     _ret = 0;
843   }else{
844     _ret = 1;
845   }
846   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
847   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
848 }
849
850 void CkMemCheckPT::doneRComparison(int ret){
851   //    if(CpvAccess(curPointer) == 0){
852   //if(ret==CkNumPes()){
853     CpvAccess(localChkpDone) = 1;
854     if(CpvAccess(remoteChkpDone) ==1){
855       thisProxy.doneBothComparison();
856     }else{
857       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
858     }
859   //}
860   /*else{
861     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
862     startTime = CmiWallTimer();
863     thisProxy.RollBack();
864   }*/
865   if(notifyReplica == 0){
866     //notify the replica am done
867     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
868     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
869     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
870     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
871     notifyReplica = 1;
872   }
873 }
874
875 void CkMemCheckPT::doneBothComparison(){
876   CpvAccess(recvdRemote) = 0;
877   CpvAccess(remoteReady) = 0;
878   CpvAccess(localReady) = 0;
879   CpvAccess(recvdLocal) = 0;
880   CpvAccess(localChkpDone) = 0;
881   CpvAccess(remoteChkpDone) = 0;
882   CpvAccess(remoteStarted) = 0;
883   CpvAccess(localStarted) = 0;
884   CpvAccess(_remoteCrashedNode) = -1;
885   CkMemCheckPT::replicaAlive = 1;
886   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
887   CpvAccess(curPointer)^=1;
888   inCheckpointing = 0;
889   notifyReplica = 0;
890   if(CkMyPe() == 0){
891     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
892   }
893   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
894 }
895
896 void CkMemCheckPT::RollBack(){
897   //restore group data
898   checkpointed = 0;
899   inRestarting = 1;
900   int pointer = CpvAccess(curPointer)^1;//use the previous one
901   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
902   PUP::fromMem p(chkpMsg->packData);    
903
904   //destroy array elements
905   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
906   int numGroups = CkpvAccess(_groupIDTable)->size();
907   for(int i=0;i<numGroups;i++) {
908     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
909     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
910     obj->flushStates();
911     obj->ckJustMigrated();
912   }
913   //restore array elements
914
915   int numElements;
916   p|numElements;
917
918   if(p.isUnpacking()){
919     for(int i=0;i<numElements;i++){
920       CkGroupID gID;
921       CkArrayIndex idx;
922       p|gID;
923       p|idx;
924       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
925       CmiAssert(mgr);
926       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
927     }
928     }
929     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
930     contribute(cb);
931   }
932
933   void CkMemCheckPT::notifyReplicaDie(int pe){
934     replicaAlive = 0;
935     CpvAccess(_remoteCrashedNode) = pe;
936   }
937
938   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
939   {
940 #if CMK_CHKP_ALL
941     int idx = 1;
942     if(msg->bud1 == CkMyPe()){
943       idx = 0;
944     }
945     int isChkpting = msg->cp_flag;
946     if(isChkpting == 1){
947       if(chkpTable[idx]) delete chkpTable[idx];
948     }
949     chkpTable[idx] = msg;
950     if(isChkpting){
951       recvCount++;
952       if(recvCount == 2){
953         if (where == CkCheckPoint_inMEM) {
954           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
955         }
956         else if (where == CkCheckPoint_inDISK) {
957           // another barrier for finalize the writing using fsync
958           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
959           contribute(0,NULL,CkReduction::sum_int,localcb);
960         }
961         else
962           CmiAbort("Unknown checkpoint scheme");
963         recvCount = 0;
964       }
965     }
966 #endif
967   }
968
969   // don't handle array elements
970   static inline void _handleProcData(PUP::er &p, CmiBool create)
971   {
972     // save readonlys, and callback BTW
973     CkPupROData(p);
974
975     // save mainchares 
976     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
977
978 #ifndef CMK_CHARE_USE_PTR
979     // save non-migratable chare
980     CkPupChareData(p);
981 #endif
982
983     // save groups into Groups.dat
984     CkPupGroupData(p,create);
985
986     // save nodegroups into NodeGroups.dat
987     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
988   }
989
990   void CkMemCheckPT::sendProcData()
991   {
992     // find out size of buffer
993     int size;
994     {
995       PUP::sizer p;
996       _handleProcData(p,CmiTrue);
997       size = p.size();
998     }
999     int packSize = size;
1000     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1001     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1002     {
1003       PUP::toMem p(msg->packData);
1004       _handleProcData(p,CmiTrue);
1005     }
1006     msg->pe = CkMyPe();
1007     msg->len = size;
1008     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1009     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1010   }
1011
1012   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1013   {
1014     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1015     CpvAccess(procChkptBuf) = msg;
1016     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1017     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1018   }
1019
1020   // ArrayElement call this function to give us the checkpointed data
1021   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1022   {
1023     int len = ckTable.length();
1024     int idx;
1025     for (idx=0; idx<len; idx++) {
1026       CkCheckPTInfo *entry = ckTable[idx];
1027       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1028     }
1029     CkAssert(idx < len);
1030     int isChkpting = msg->cp_flag;
1031     ckTable[idx]->updateBuffer(msg);
1032     if (isChkpting) {
1033       // all my array elements have returned their inmem data
1034       // inform starter processor that I am done.
1035       recvCount ++;
1036       if (recvCount == ckTable.length()) {
1037         if (where == CkCheckPoint_inMEM) {
1038           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1039         }
1040         else if (where == CkCheckPoint_inDISK) {
1041           // another barrier for finalize the writing using fsync
1042           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1043           contribute(0,NULL,CkReduction::sum_int,localcb);
1044         }
1045         else
1046           CmiAbort("Unknown checkpoint scheme");
1047         recvCount = 0;
1048       } 
1049     }
1050   }
1051
1052   // only used in disk checkpointing
1053   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1054   {
1055     delete m;
1056 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1057     system("sync");
1058 #endif
1059     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1060   }
1061
1062   // only is called on cpStarter when checkpoint is done
1063   void CkMemCheckPT::cpFinish()
1064   {
1065     CmiAssert(CkMyPe() == cpStarter);
1066     peCount++;
1067     // now that all processors have finished, activate callback
1068     if (peCount == 2) 
1069     {
1070       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1071       peCount = 0;
1072       thisProxy.report();
1073     }
1074   }
1075
1076   // for debugging, report checkpoint info
1077   void CkMemCheckPT::report()
1078   {
1079     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1080     inCheckpointing = 0;
1081 #if !CMK_CHKP_ALL
1082     int objsize = 0;
1083     int len = ckTable.length();
1084     for (int i=0; i<len; i++) {
1085       CkCheckPTInfo *entry = ckTable[i];
1086       CmiAssert(entry);
1087       objsize += entry->getSize();
1088     }
1089     CmiAssert(CpvAccess(procChkptBuf));
1090     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1091 #else
1092     if(CkMyPe()==0)
1093       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1094 #endif
1095   }
1096
1097   /*****************************************************************************
1098     RESTART Procedure
1099    *****************************************************************************/
1100
1101   // master processor of two buddies
1102   inline int CkMemCheckPT::isMaster(int buddype)
1103   {
1104 #if 0
1105     int mype = CkMyPe();
1106     //CkPrintf("ismaster: %d %d\n", pe, mype);
1107     if (CkNumPes() - totalFailed() == 2) {
1108       return mype > buddype;
1109     }
1110     for (int i=1; i<CkNumPes(); i++) {
1111       int me = (buddype+i)%CkNumPes();
1112       if (isFailed(me)) continue;
1113       if (me == mype) return 1;
1114       else return 0;
1115     }
1116     return 0;
1117 #else
1118     // smaller one
1119     int mype = CkMyPe();
1120     //CkPrintf("ismaster: %d %d\n", pe, mype);
1121     if (CkNumPes() - totalFailed() == 2) {
1122       return mype < buddype;
1123     }
1124 #if NODE_CHECKPOINT
1125     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1126     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1127 #else
1128       for (int i=1; i<CkNumPes(); i++) {
1129 #endif
1130         int me = (mype+i)%CkNumPes();
1131         if (isFailed(me)) continue;
1132         if (me == buddype) return 1;
1133         else return 0;
1134       }
1135       return 0;
1136 #endif
1137     }
1138
1139
1140
1141 #if 0
1142     // helper class to pup all elements that belong to same ckLocMgr
1143     class ElementDestoryer : public CkLocIterator {
1144       private:
1145         CkLocMgr *locMgr;
1146       public:
1147         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1148         void addLocation(CkLocation &loc) {
1149           CkArrayIndex idx=loc.getIndex();
1150           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1151           loc.destroy();
1152         }
1153     };
1154 #endif
1155
1156     // restore the bitmap vector for LB
1157     void CkMemCheckPT::resetLB(int diepe)
1158     {
1159 #if CMK_LBDB_ON
1160       int i;
1161       char *bitmap = new char[CkNumPes()];
1162       // set processor available bitmap
1163       get_avail_vector(bitmap);
1164       for (i=0; i<failedPes.length(); i++)
1165         bitmap[failedPes[i]] = 0; 
1166       bitmap[diepe] = 0;
1167
1168 #if CK_NO_PROC_POOL
1169       set_avail_vector(bitmap);
1170 #endif
1171
1172       // if I am the crashed pe, rebuild my failedPEs array
1173       if (CkMyNode() == diepe)
1174         for (i=0; i<CkNumPes(); i++) 
1175           if (bitmap[i]==0) failed(i);
1176
1177       delete [] bitmap;
1178 #endif
1179     }
1180
1181     static double restartT;
1182
1183     // in case when failedPe dies, everybody go through its checkpoint table:
1184     // destory all array elements
1185     // recover lost buddies
1186     // reconstruct all array elements from check point data
1187     // called on all processors
1188     void CkMemCheckPT::restart(int diePe)
1189     {
1190 #if CMK_MEM_CHECKPOINT
1191       thisFailedPe = diePe;
1192       double curTime = CmiWallTimer();
1193       if (CkMyPe() == thisFailedPe){
1194         restartT = CmiWallTimer();
1195         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1196       }
1197       stage = (char*)"resetLB";
1198       startTime = curTime;
1199       if (CkMyPe() == thisFailedPe)
1200         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1201
1202 //#if CK_NO_PROC_POOL
1203 //      failed(diePe);  // add into the list of failed pes
1204 //#endif
1205
1206 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1207
1208       inRestarting = 1;
1209
1210       // disable load balancer's barrier
1211       //if (CkMyPe() != diePe) resetLB(diePe);
1212
1213       CKLOCMGR_LOOP(mgr->startInserting(););
1214
1215
1216       if(CmiNumPartition()==1){
1217         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1218       }else{
1219         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1220         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1221       }
1222 #endif
1223     }
1224
1225     // loally remove all array elements
1226     void CkMemCheckPT::removeArrayElements()
1227     {
1228 #if CMK_MEM_CHECKPOINT
1229       int len = ckTable.length();
1230       double curTime = CmiWallTimer();
1231       if (CkMyPe() == thisFailedPe) 
1232         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1233       stage = (char*)"removeArrayElements";
1234       startTime = curTime;
1235
1236       //  if (cpCallback.isInvalid()) 
1237       //          CkPrintf("invalid pe %d\n",CkMyPe());
1238       //          CkAbort("Didn't set restart callback\n");;
1239       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1240
1241       // get rid of all buffering and remote recs
1242       // including destorying all array elements
1243 #if CK_NO_PROC_POOL  
1244       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1245 #else
1246       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1247 #endif
1248       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1249 #endif
1250     }
1251
1252     // flush state in reduction manager
1253     void CkMemCheckPT::resetReductionMgr()
1254     {
1255       if (CkMyPe() == thisFailedPe) 
1256         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1257       int numGroups = CkpvAccess(_groupIDTable)->size();
1258       for(int i=0;i<numGroups;i++) {
1259         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1260         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1261         obj->flushStates();
1262         obj->ckJustMigrated();
1263       }
1264       // reset again
1265       //CpvAccess(_qd)->flushStates();
1266       if(CmiNumPartition()==1){
1267         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1268       }
1269       else{
1270         if (CkMyPe() == thisFailedPe) 
1271           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1272         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1273       }
1274     }
1275
1276     // recover the lost buddies
1277     void CkMemCheckPT::recoverBuddies()
1278     {
1279       int idx;
1280       int len = ckTable.length();
1281       // ready to flush reduction manager
1282       // cannot be CkMemCheckPT::restart because destory will modify states
1283       double curTime = CmiWallTimer();
1284       if (CkMyPe() == thisFailedPe)
1285         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1286       stage = (char *)"recoverBuddies";
1287       if (CkMyPe() == thisFailedPe)
1288         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1289       startTime = curTime;
1290
1291       // recover buddies
1292       expectCount = 0;
1293 #if !CMK_CHKP_ALL
1294       for (idx=0; idx<len; idx++) {
1295         CkCheckPTInfo *entry = ckTable[idx];
1296         if (entry->pNo == thisFailedPe) {
1297 #if CK_NO_PROC_POOL
1298           // find a new buddy
1299           int budPe = BuddyPE(CkMyPe());
1300 #else
1301           int budPe = thisFailedPe;
1302 #endif
1303           CkArrayCheckPTMessage *msg = entry->getCopy();
1304           msg->bud1 = budPe;
1305           msg->bud2 = CkMyPe();
1306           msg->cp_flag = 0;            // not checkpointing
1307           thisProxy[budPe].recoverEntry(msg);
1308           expectCount ++;
1309         }
1310       }
1311 #else
1312       //send to failed pe
1313       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1314 #if CK_NO_PROC_POOL
1315         // find a new buddy
1316         int budPe = BuddyPE(CkMyPe());
1317 #else
1318         int budPe = thisFailedPe;
1319 #endif
1320         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1321         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1322         msg->cp_flag = 0;            // not checkpointing
1323         msg->bud1 = budPe;
1324         msg->bud2 = CkMyPe();
1325         thisProxy[budPe].recoverEntry(msg);
1326         expectCount ++;
1327       }
1328 #endif
1329
1330       if (expectCount == 0) {
1331         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1332       }
1333       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1334     }
1335
1336     void CkMemCheckPT::gotData()
1337     {
1338       ackCount ++;
1339       if (ackCount == expectCount) {
1340         ackCount = 0;
1341         expectCount = -1;
1342         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1343         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1344       }
1345     }
1346
1347     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1348     {
1349
1350       for (int i=0; i<n; i++) {
1351         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1352         mgr->updateLocation(idx[i], nowOnPe);
1353       }
1354       thisProxy[nowOnPe].gotReply();
1355     }
1356
1357     // restore array elements
1358     void CkMemCheckPT::recoverArrayElements()
1359     {
1360       double curTime = CmiWallTimer();
1361       int len = ckTable.length();
1362       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1363       stage = (char *)"recoverArrayElements";
1364       startTime = curTime;
1365       int flag = 0;
1366       // recover all array elements
1367       int count = 0;
1368
1369 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1370       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1371       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1372 #endif
1373
1374 #if !CMK_CHKP_ALL
1375       for (int idx=0; idx<len; idx++)
1376       {
1377         CkCheckPTInfo *entry = ckTable[idx];
1378 #if CK_NO_PROC_POOL
1379         // the bigger one will do 
1380         //    if (CkMyPe() < entry->pNo) continue;
1381         if (!isMaster(entry->pNo)) continue;
1382 #else
1383         // smaller one do it, which has the original object
1384         if (CkMyPe() == entry->pNo+1 || 
1385             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1386 #endif
1387         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1388
1389         entry->updateBuddy(CkMyPe(), entry->pNo);
1390         CkArrayCheckPTMessage *msg = entry->getCopy();
1391         // gzheng
1392         //thisProxy[CkMyPe()].inmem_restore(msg);
1393         inmem_restore(msg);
1394 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1395         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1396         int homePe = mgr->homePe(msg->index);
1397         if (homePe != CkMyPe()) {
1398           gmap[homePe].push_back(msg->locMgr);
1399           imap[homePe].push_back(msg->index);
1400         }
1401 #endif
1402         CkFreeMsg(msg);
1403         count ++;
1404       }
1405 #else
1406       char * packData;
1407       if(CmiNumPartition()==1){
1408         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1409         packData = (char *)msg->packData;
1410       }
1411       else{
1412         int pointer = CpvAccess(curPointer);
1413         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1414         packData = msg->packData;
1415       }
1416 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1417       recoverAll(packData,gmap,imap);
1418 #else
1419       recoverAll(packData);
1420 #endif
1421 #endif
1422 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1423       for (int i=0; i<CkNumPes(); i++) {
1424         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1425           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1426           flag++;       
1427         }
1428       }
1429       delete [] imap;
1430       delete [] gmap;
1431 #endif
1432       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1433       //if (CkMyPe() == thisFailedPe)
1434
1435       CKLOCMGR_LOOP(mgr->doneInserting(););
1436
1437       // _crashedNode = -1;
1438       CpvAccess(_crashedNode) = -1;
1439 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1440       if (CkMyPe() == 0)
1441         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1442 #else
1443       if(flag == 0)
1444       {
1445         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1446       }
1447 #endif
1448     }
1449
1450     void CkMemCheckPT::gotReply(){
1451       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1452     }
1453
1454     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1455 #if CMK_CHKP_ALL
1456       PUP::fromMem p(packData);
1457       int numElements;
1458       p|numElements;
1459       if(p.isUnpacking()){
1460         for(int i=0;i<numElements;i++){
1461           CkGroupID gID;
1462           CkArrayIndex idx;
1463           p|gID;
1464           p|idx;
1465           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1466           int homePe = mgr->homePe(idx);
1467 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1468           mgr->resume(idx,p,CmiTrue,CmiTrue);
1469 #else
1470           if(CmiNumPartition()==1)
1471             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1472           else{
1473             if(CkMyPe()==thisFailedPe){
1474               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1475             }
1476             else{
1477               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1478             }
1479           }
1480 #endif
1481 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1482           homePe = mgr->homePe(idx);
1483           if (homePe != CkMyPe()) {
1484             gmap[homePe].push_back(gID);
1485             imap[homePe].push_back(idx);
1486           }
1487 #endif
1488         }
1489       }
1490 #endif
1491     }
1492
1493
1494     // on every processor
1495     // turn load balancer back on
1496     void CkMemCheckPT::finishUp()
1497     {
1498       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1499       //CKLOCMGR_LOOP(mgr->doneInserting(););
1500
1501       if (CkMyPe() == thisFailedPe)
1502       {
1503         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1504         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1505         fflush(stdout);
1506       }
1507       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1508       inRestarting = 0;
1509       maxIter = -1;
1510 #if CMK_CONVERSE_MPI    
1511       if(CmiNumPartition()!=1){
1512         CpvAccess(recvdProcChkp) = 0;
1513         CpvAccess(recvdArrayChkp) = 0;
1514         CpvAccess(curPointer)^=1;
1515         //notify my replica, restart is done
1516         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1517           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1518           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1519           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1520         }
1521       }
1522       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1523         lastPingTime = CmiWallTimer();
1524         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1525       }
1526 #endif
1527
1528 #if CK_NO_PROC_POOL
1529 #if NODE_CHECKPOINT
1530       int numnodes = CmiNumPhysicalNodes();
1531 #else
1532       int numnodes = CkNumPes();
1533 #endif
1534       if (numnodes-totalFailed() <=2) {
1535         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1536         _memChkptOn = 0;
1537       }
1538 #endif
1539     }
1540
1541     void CkMemCheckPT::recoverFromSoftFailure()
1542     {
1543       inRestarting = 0;
1544       maxIter = -1;
1545       CpvAccess(recvdRemote) = 0;
1546       CpvAccess(recvdLocal) = 0;
1547       CpvAccess(localChkpDone) = 0;
1548       CpvAccess(remoteChkpDone) = 0;
1549       CpvAccess(remoteReady) = 0;
1550       CpvAccess(localReady) = 0;
1551       inCheckpointing = 0;
1552       notifyReplica = 0;
1553       CpvAccess(remoteStarted) = 0;
1554       CpvAccess(localStarted) = 0;
1555       CpvAccess(_remoteCrashedNode) = -1;
1556       CkMemCheckPT::replicaAlive = 1;
1557       inCheckpointing = 0;
1558       notifyReplica = 0;
1559       if(CkMyPe() == 0){
1560         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1561       }
1562       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1563     }
1564     // called only on 0
1565     void CkMemCheckPT::quiescence(CkCallback &cb)
1566     {
1567       static int pe_count = 0;
1568       pe_count ++;
1569       CmiAssert(CkMyPe() == 0);
1570       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1571       if (pe_count == CkNumPes()) {
1572         pe_count = 0;
1573         cb.send();
1574       }
1575     }
1576
1577     // User callable function - to start a checkpoint
1578     // callback cb is used to pass control back
1579     void CkStartMemCheckpoint(CkCallback &cb)
1580     {
1581 #if CMK_MEM_CHECKPOINT
1582       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1583       /*if (_memChkptOn == 0) {
1584         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1585         cb.send();
1586         return;
1587         }*/
1588       if (CkInRestarting()) {
1589         // trying to checkpointing during restart
1590         cb.send();
1591         return;
1592       }
1593       // store user callback and user data
1594       CkMemCheckPT::cpCallback = cb;
1595
1596
1597       //send to my replica that checkpoint begins 
1598       if(CkReplicaAlive()==1){
1599         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1600         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1601         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1602       }
1603       CpvAccess(localStarted) = 1;
1604       // broadcast to start check pointing
1605       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1606         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1607         checkptMgr.doItNow(CkMyPe());
1608       }
1609 #else
1610       // when mem checkpoint is disabled, invike cb immediately
1611       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1612       cb.send();
1613 #endif
1614     }
1615
1616     void CkRestartCheckPoint(int diePe)
1617     {
1618       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1619       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1620       // broadcast
1621       checkptMgr.restart(diePe);
1622     }
1623
1624     static int _diePE = -1;
1625
1626     // callback function used locally by ccs handler
1627     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1628     {
1629       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1630       CkRestartCheckPoint(_diePE);
1631     }
1632
1633
1634     // called on crashed PE
1635     static void restartBeginHandler(char *msg)
1636     {
1637       CmiFree(msg);
1638 #if CMK_MEM_CHECKPOINT
1639 #if CMK_USE_BARRIER
1640       if(CkMyPe()!=_diePE){
1641         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1642         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1643         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1644         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1645       }else{
1646         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1647         CkRestartCheckPointCallback(NULL, NULL);
1648       }
1649 #else
1650       static int count = 0;
1651       CmiAssert(CkMyPe() == _diePE);
1652       count ++;
1653       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1654         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1655         CkRestartCheckPointCallback(NULL, NULL);
1656         count = 0;
1657       }
1658 #endif
1659 #endif
1660     }
1661
1662     extern void _discard_charm_message();
1663     extern void _resume_charm_message();
1664
1665     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1666       return data;
1667     }
1668
1669     static void restartBcastHandler(char *msg)
1670     {
1671 #if CMK_MEM_CHECKPOINT
1672       // advance phase counter
1673       CkMemCheckPT::inRestarting = 1;
1674       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1675       // gzheng
1676       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1677
1678       if (CkMyPe()==_diePE)
1679         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1680
1681       // reset QD counters
1682       /*  gzheng
1683           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1684        */
1685
1686       /*  gzheng
1687           if (CkMyPe()==_diePE)
1688           CkRestartCheckPointCallback(NULL, NULL);
1689        */
1690       CmiFree(msg);
1691
1692       _resume_charm_message();
1693
1694       // reduction
1695       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1696       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1697 #if CMK_USE_BARRIER
1698       //CmiPrintf("before reduce\n");   
1699       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1700       //CmiPrintf("after reduce\n");    
1701 #else
1702       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1703 #endif 
1704       checkpointed = 0;
1705 #endif
1706     }
1707
1708     extern void _initDone();
1709
1710     bool compare(char * buf1, char *buf2){
1711       if(CkMyPe()==0)
1712         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1713       PUP::checker pchecker(buf1,buf2);
1714       pchecker.skip();
1715       int numElements;
1716       pchecker|numElements;
1717       for(int i=0;i<numElements;i++){
1718         CkGroupID gID;
1719         CkArrayIndex idx;
1720
1721         pchecker|gID;
1722         pchecker|idx;
1723
1724         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1725         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1726       }
1727       if(CkMyPe()==0)
1728         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1729       //int fault_num = pchecker.getFaultNum();
1730       bool result = pchecker.getResult();
1731       /*if(!result){
1732         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1733       }*/
1734       return result;
1735     }
1736     int getChecksum(char * buf){
1737       PUP::checker pchecker(buf);
1738       pchecker.skip();
1739       int numElements;
1740       pchecker|numElements;
1741 //      CkPrintf("num %d\n",numElements);
1742       for(int i=0;i<numElements;i++){
1743         CkGroupID gID;
1744         CkArrayIndex idx;
1745
1746         pchecker|gID;
1747         pchecker|idx;
1748
1749         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1750         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1751       }
1752       return pchecker.getChecksum();
1753     }
1754
1755     static void recvRemoteChkpHandler(char *msg){
1756       CpvAccess(remoteChkpDone) = 1;
1757       if(CpvAccess(use_checksum)){
1758         if(CkMyPe()==0)
1759           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1760         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1761         CpvAccess(recvdRemote) = 1;
1762         if(CpvAccess(recvdLocal)==1){
1763           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1764             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1765           }
1766           else{
1767             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1768           }
1769         }
1770       }else{
1771         envelope *env = (envelope *)msg;
1772         CkUnpackMessage(&env);
1773         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1774         if(CpvAccess(recvdLocal)==1){
1775           int pointer = CpvAccess(curPointer);
1776           int size = CpvAccess(chkpBuf)[pointer]->len;
1777           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1778             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1779           }else
1780           {
1781             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1782           }
1783           delete chkpMsg;
1784           if(CkMyPe()==0)
1785             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1786         }else{
1787           CpvAccess(recvdRemote) = 1;
1788           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1789           CpvAccess(buddyBuf) = chkpMsg;
1790         }
1791       }
1792     }
1793
1794     static void replicaRecoverHandler(char *msg){
1795       //fflush(stdout);
1796       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1797       CpvAccess(remoteChkpDone) = 1;
1798       if(CpvAccess(localChkpDone) == 1)
1799         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1800       CmiFree(msg);
1801     }
1802
1803     static void replicaChkpDoneHandler(char *msg){
1804       CpvAccess(remoteChkpDone) = 1;
1805       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1806       if(CpvAccess(localChkpDone) == 1)
1807         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1808       CmiFree(msg);
1809     }
1810
1811     static void replicaDieHandler(char * msg){
1812 #if CMK_CONVERSE_MPI
1813       //broadcast to every one in my replica
1814       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1815       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1816 #endif
1817     }
1818
1819     static void replicaChkpStartHandler(char * msg){
1820       CpvAccess(remoteStarted) =1;
1821       if(CpvAccess(localStarted)==1){    
1822         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1823         checkptMgr.doItNow(0);
1824       }
1825     }
1826
1827
1828     static void replicaDieBcastHandler(char *msg){
1829       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1830       CpvAccess(_remoteCrashedNode) = diePe;
1831       if(CkMyPe()==diePe){
1832         CmiPrintf("pe %d in replicad word die\n",diePe);
1833         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1834         fflush(stdout);
1835       }
1836       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1837         find_spare_mpirank(diePe,CmiMyPartition()^1);
1838       }
1839       //broadcast to my partition to get local max iter
1840       if(CpvAccess(resilience)!=1){
1841         CkMemCheckPT::replicaAlive = 0;
1842         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1843       }
1844       CmiFree(msg);
1845     }
1846
1847     static void recoverRemoteProcDataHandler(char *msg){
1848       envelope *env = (envelope *)msg;
1849       CkUnpackMessage(&env);
1850       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1851
1852       //store the checkpoint
1853       int pointer = procMsg->pointer;
1854
1855
1856       if(CkMyPe()==CpvAccess(_crashedNode)){
1857         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1858         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1859         PUP::fromMem p(procMsg->packData);
1860         _handleProcData(p,CmiTrue);
1861         _initDone();
1862       }
1863       else{
1864         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1865         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1866         //_handleProcData(p,CmiFalse);
1867       }
1868       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1869       CKLOCMGR_LOOP(mgr->startInserting(););
1870
1871       CpvAccess(recvdProcChkp) =1;
1872       if(CpvAccess(recvdArrayChkp)==1){
1873         _resume_charm_message();
1874         _diePE = CpvAccess(_crashedNode);
1875         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1876         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1877         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1878 #if CMK_USE_BARRIER
1879       //CmiPrintf("before reduce\n");   
1880         if(CpvAccess(resilience)==1){
1881           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1882         }else
1883           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1884       //CmiPrintf("after reduce\n");    
1885 #else
1886         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1887 #endif 
1888       }
1889     }
1890
1891     static void recoverRemoteArrayDataHandler(char *msg){
1892       envelope *env = (envelope *)msg;
1893       CkUnpackMessage(&env);
1894       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1895
1896       //store the checkpoint
1897       int pointer = chkpMsg->pointer;
1898       CpvAccess(curPointer) = pointer;
1899       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1900       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1901       CpvAccess(recvdArrayChkp) =1;
1902       CkMemCheckPT::inRestarting = 1;
1903       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1904         _resume_charm_message();
1905         _diePE = CpvAccess(_crashedNode);
1906         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1907         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1908         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1909         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1910 #if CMK_USE_BARRIER
1911       //CmiPrintf("before reduce\n");   
1912         if(CpvAccess(resilience)==1){
1913           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1914         }else
1915           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1916       //CmiPrintf("after reduce\n");    
1917 #else
1918         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1919 #endif 
1920       }
1921     }
1922
1923     static void recvPhaseHandler(char * msg)
1924     {
1925       CpvAccess(_curRestartPhase)--;
1926       CkMemCheckPT::inRestarting = 1;
1927       CmiFree(msg);
1928       //notify the buddy in the replica now i can receive the checkpoint msg
1929       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1930         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1931         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1932         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1933         //timer start
1934         if(CpvAccess(_crashedNode)==CkMyPe())
1935           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1936         if(CpvAccess(_crashedNode)==CmiMyPe())
1937           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1938       }else{
1939         CpvAccess(curPointer)^=1;
1940         CkMemCheckPT::inRestarting = 1;
1941         _resume_charm_message();
1942         _diePE = CpvAccess(_crashedNode);
1943       }
1944     }
1945     
1946     static void askRecoverDataHandler(char * msg){
1947       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1948         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1949       if(CpvAccess(resilience)!=1){
1950         CpvAccess(remoteReady)=1;
1951         if(CpvAccess(localReady)==1){
1952           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1953           {     
1954             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1955             CkPackMessage(&env);
1956             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1957             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1958             CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
1959           }
1960           //send the array checkpoint data
1961           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1962           CkPackMessage(&env);
1963           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1964           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1965           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1966             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1967         }
1968       }else{
1969         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
1970         int pointer = CpvAccess(curPointer)^1;
1971         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1972         procMsg->pointer = pointer;
1973         envelope * env = (envelope *)(UsrToEnv(procMsg));
1974         CkPackMessage(&env);
1975         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1976         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1977         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
1978         
1979         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1980         arrayMsg->pointer = pointer;
1981         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1982         CkPackMessage(&env1);
1983         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1984         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1985         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1986       }
1987     }
1988     // called on crashed processor
1989     static void recoverProcDataHandler(char *msg)
1990     {
1991 #if CMK_MEM_CHECKPOINT
1992       int i;
1993       envelope *env = (envelope *)msg;
1994       CkUnpackMessage(&env);
1995       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1996       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1997       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1998       PUP::fromMem p(procMsg->packData);
1999       _handleProcData(p,CmiTrue);
2000
2001       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2002       // gzheng
2003       CKLOCMGR_LOOP(mgr->startInserting(););
2004
2005       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2006       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2007       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2008       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2009
2010       _initDone();
2011       //   CpvAccess(_qd)->flushStates();
2012       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2013 #endif
2014     }
2015     //for replica, got the phase number from my neighbor processor in the current partition
2016     static void askPhaseHandler(char *msg)
2017     {
2018 #if CMK_MEM_CHECKPOINT
2019       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2020       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2021       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2022       CmiSetHandler(msg, recvPhaseHandlerIdx);
2023       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2024 #endif
2025     }
2026     // called on its backup processor
2027     // get backup message buffer and sent to crashed processor
2028     static void askProcDataHandler(char *msg)
2029     {
2030 #if CMK_MEM_CHECKPOINT
2031       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2032       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2033       if (CpvAccess(procChkptBuf) == NULL)  {
2034         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2035         CkAbort("no checkpoint found");
2036       }
2037       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2038       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2039
2040       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2041
2042       CkPackMessage(&env);
2043       CmiSetHandler(env, recoverProcDataHandlerIdx);
2044       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2045       CpvAccess(procChkptBuf) = NULL;
2046       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2047 #endif
2048     }
2049
2050     // called on PE 0
2051     void qd_callback(void *m)
2052     {
2053       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2054       fflush(stdout);
2055       CkFreeMsg(m);
2056       if(CmiNumPartition()==1){
2057 #ifdef CMK_SMP
2058         for(int i=0;i<CmiMyNodeSize();i++){
2059           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2060           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2061           CmiSetHandler(msg, askProcDataHandlerIdx);
2062           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2063           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2064         }
2065         return;
2066 #endif
2067         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2068         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2069         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2070         CmiSetHandler(msg, askProcDataHandlerIdx);
2071         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2072         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2073       }
2074       else{
2075         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2076         CmiSetHandler(msg, recvPhaseHandlerIdx);
2077         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2078       }
2079     }
2080
2081     // on crashed node
2082     void CkMemRestart(const char *dummy, CkArgMsg *args)
2083     {
2084 #if CMK_MEM_CHECKPOINT
2085       _diePE = CmiMyNode();
2086       CpvAccess( _crashedNode )= CmiMyNode();
2087       CkMemCheckPT::inRestarting = 1;
2088       _discard_charm_message();
2089
2090       /*if(CmiMyRank()==0){
2091         CkCallback cb(qd_callback);
2092         CkStartQD(cb);
2093         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2094         }*/
2095       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2096       if(CmiNumPartition()==1){
2097         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2098         restartT = CmiWallTimer();
2099         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2100         fflush(stdout);
2101         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2102         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2103         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2104         CmiSetHandler(msg, askProcDataHandlerIdx);
2105         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2106         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2107       }
2108       else{
2109         CkCallback cb(qd_callback);
2110         CkStartQD(cb);
2111       }
2112 #else
2113       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2114 #endif
2115     }
2116
2117     // can be called in other files
2118     // return true if it is in restarting
2119     extern "C"
2120       int CkInRestarting()
2121       {
2122 #if CMK_MEM_CHECKPOINT
2123         if (CpvAccess( _crashedNode)!=-1) return 1;
2124         // gzheng
2125         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2126         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2127         return CkMemCheckPT::inRestarting;
2128 #else
2129         return 0;
2130 #endif
2131       }
2132
2133     extern "C"
2134       int CkReplicaAlive()
2135       {
2136         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2137         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2138         return CkMemCheckPT::replicaAlive;
2139
2140         /*if(CkMemCheckPT::replicaDead==1)
2141           return 0;
2142           else          
2143           return 1;*/
2144       }
2145
2146     extern "C"
2147       int CkInCheckpointing()
2148       {
2149         return CkMemCheckPT::inCheckpointing;
2150       }
2151
2152     extern "C"
2153       void CkSetInLdb(){
2154 #if CMK_MEM_CHECKPOINT
2155         CkMemCheckPT::inLoadbalancing = 1;
2156 #endif
2157       }
2158
2159     extern "C"
2160       int CkInLdb(){
2161 #if CMK_MEM_CHECKPOINT
2162         return CkMemCheckPT::inLoadbalancing;
2163 #endif
2164         return 0;
2165       }
2166
2167     extern "C"
2168       void CkResetInLdb(){
2169 #if CMK_MEM_CHECKPOINT
2170         CkMemCheckPT::inLoadbalancing = 0;
2171 #endif
2172       }
2173
2174     /*****************************************************************************
2175       module initialization
2176      *****************************************************************************/
2177
2178     static int arg_where = CkCheckPoint_inMEM;
2179
2180 #if CMK_MEM_CHECKPOINT
2181     void init_memcheckpt(char **argv)
2182     {
2183       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2184         arg_where = CkCheckPoint_inDISK;
2185       }
2186       CpvInitialize(int, use_checksum);
2187       CpvInitialize(int, resilience);
2188       CpvAccess(use_checksum)=0;
2189       CpvAccess(resilience)=0;
2190       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2191         CpvAccess(use_checksum)=1;
2192       }
2193       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2194         CpvAccess(resilience)=1;
2195       }
2196       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2197         CpvAccess(resilience)=2;
2198       }
2199       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2200         CpvAccess(resilience)=3;
2201       }
2202       // initiliazing _crashedNode variable
2203       CpvInitialize(int, _crashedNode);
2204       CpvInitialize(int, _remoteCrashedNode);
2205       CpvAccess(_crashedNode) = -1;
2206       CpvAccess(_remoteCrashedNode) = -1;
2207       init_FI(argv);
2208     }
2209 #endif
2210
2211     class CkMemCheckPTInit: public Chare {
2212       public:
2213         CkMemCheckPTInit(CkArgMsg *m) {
2214 #if CMK_MEM_CHECKPOINT
2215           if (arg_where == CkCheckPoint_inDISK) {
2216             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2217           }
2218           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2219           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2220 #endif
2221         }
2222     };
2223
2224     static void notifyHandler(char *msg)
2225     {
2226 #if CMK_MEM_CHECKPOINT
2227       CmiFree(msg);
2228       /* immediately increase restart phase to filter old messages */
2229       CpvAccess(_curRestartPhase) ++;
2230       CpvAccess(_qd)->flushStates();
2231       _discard_charm_message();
2232
2233 #endif
2234     }
2235
2236     extern "C"
2237       void notify_crash(int node)
2238       {
2239 #ifdef CMK_MEM_CHECKPOINT
2240         CpvAccess( _crashedNode) = node;
2241 #ifdef CMK_SMP
2242         for(int i=0;i<CkMyNodeSize();i++){
2243           CpvAccessOther(_crashedNode,i)=node;
2244         }
2245 #endif
2246         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2247         CkMemCheckPT::inRestarting = 1;
2248
2249         // this may be in interrupt handler, send a message to reset QD
2250         int pe = CmiNodeFirst(CkMyNode());
2251         for(int i=0;i<CkMyNodeSize();i++){
2252           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2253           CmiSetHandler(msg, notifyHandlerIdx);
2254           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2255         }
2256 #endif
2257       }
2258
2259     extern "C" void (*notify_crash_fn)(int node);
2260
2261
2262 #if CMK_CONVERSE_MPI
2263     void buddyDieHandler(char *msg)
2264     {
2265 #if CMK_MEM_CHECKPOINT
2266       // notify
2267       CkMemCheckPT::inRestarting = 1;
2268       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2269       notify_crash(diepe);
2270       // send message to crash pe to let it restart
2271       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2272       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2273       int buddy = obj->BuddyPE(CmiMyPe());
2274       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2275       if (buddy == diepe)  {
2276         mpi_restart_crashed(diepe, newrank);
2277       }
2278 #endif
2279     }
2280
2281     void pingHandler(void *msg)
2282     {
2283       lastPingTime = CmiWallTimer();
2284       CmiFree(msg);
2285     }
2286
2287     void pingCheckHandler()
2288     {
2289 #if CMK_MEM_CHECKPOINT
2290       double now = CmiWallTimer();
2291       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2292         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2293         int i, pe, buddy;
2294         // tell everyone the buddy dies
2295         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2296         for (i = 1; i < CmiNumPes(); i++) {
2297           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2298           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2299         }
2300         buddy = pe;
2301         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2302         fflush(stdout);
2303         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2304         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2305         CmiSetHandler(msg, buddyDieHandlerIdx);
2306         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2307         //send to everyone in the other world
2308         if(CmiNumPartition()!=1){
2309           //for(int i=0;i<CmiNumPes();i++){
2310             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2311             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2312             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2313             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2314           //}
2315         }
2316       }
2317         else 
2318           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2319 #endif
2320       }
2321
2322       void pingBuddy()
2323       {
2324 #if CMK_MEM_CHECKPOINT
2325         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2326         if (obj) {
2327           int buddy = obj->BuddyPE(CkMyPe());
2328           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2329           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2330           CmiSetHandler(msg, pingHandlerIdx);
2331           CmiGetRestartPhase(msg) = 9999;
2332           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2333         }
2334         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2335 #endif
2336       }
2337 #endif
2338
2339       // initproc
2340       void CkRegisterRestartHandler( )
2341       {
2342 #if CMK_MEM_CHECKPOINT
2343         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2344         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2345         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2346         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2347         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2348         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2349
2350         //for replica
2351         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2352         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2353         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2354         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2355         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2356         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2357         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2358         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2359         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2360         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2361
2362 #if CMK_CONVERSE_MPI
2363         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2364         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2365         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2366 #endif
2367
2368         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2369         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2370         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2371         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2372         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2373         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2374         CpvInitialize(int,curPointer);
2375         CpvInitialize(int,recvdLocal);
2376         CpvInitialize(int,localChkpDone);
2377         CpvInitialize(int,remoteChkpDone);
2378         CpvInitialize(int,remoteStarted);
2379         CpvInitialize(int,localStarted);
2380         CpvInitialize(int,localReady);
2381         CpvInitialize(int,remoteReady);
2382         CpvInitialize(int,recvdRemote);
2383         CpvInitialize(int,recvdProcChkp);
2384         CpvInitialize(int,localChecksum);
2385         CpvInitialize(int,remoteChecksum);
2386         CpvInitialize(int,recvdArrayChkp);
2387
2388         CpvAccess(procChkptBuf) = NULL;
2389         CpvAccess(buddyBuf) = NULL;
2390         CpvAccess(recoverProcBuf) = NULL;
2391         CpvAccess(recoverArrayBuf) = NULL;
2392         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2393         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2394         CpvAccess(chkpBuf)[0] = NULL;
2395         CpvAccess(chkpBuf)[1] = NULL;
2396         CpvAccess(localProcChkpBuf)[0] = NULL;
2397         CpvAccess(localProcChkpBuf)[1] = NULL;
2398
2399         CpvAccess(curPointer) = 0;
2400         CpvAccess(recvdLocal) = 0;
2401         CpvAccess(localChkpDone) = 0;
2402         CpvAccess(remoteChkpDone) = 0;
2403         CpvAccess(remoteStarted) = 0;
2404         CpvAccess(localStarted) = 0;
2405         CpvAccess(localReady) = 0;
2406         CpvAccess(remoteReady) = 0;
2407         CpvAccess(recvdRemote) = 0;
2408         CpvAccess(recvdProcChkp) = 0;
2409         CpvAccess(localChecksum) = 0;
2410         CpvAccess(remoteChecksum) = 0;
2411         CpvAccess(recvdArrayChkp) = 0;
2412
2413         notify_crash_fn = notify_crash;
2414
2415 #if ! CMK_CONVERSE_MPI
2416         // print pid to kill
2417         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2418         //  sleep(4);
2419 #endif
2420 #endif
2421       }
2422
2423
2424       extern "C"
2425         int CkHasCheckpoints()
2426         {
2427           return checkpointed;
2428         }
2429
2430       /// @todo: the following definitions should be moved to a separate file containing
2431       // structures and functions about fault tolerance strategies
2432
2433       /**
2434        *  * @brief: function for killing a process                                             
2435        *   */
2436 #ifdef CMK_MEM_CHECKPOINT
2437 #if CMK_HAS_GETPID
2438       void killLocal(void *_dummy,double curWallTime){
2439         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2440         if(CmiWallTimer()<killTime-1){
2441           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2442         }else{ 
2443 #if CMK_CONVERSE_MPI
2444           CkDieNow();
2445 #else 
2446           kill(getpid(),SIGKILL);                                               
2447 #endif
2448         }              
2449       } 
2450 #else
2451       void killLocal(void *_dummy,double curWallTime){
2452         CmiAbort("kill() not supported!");
2453       }
2454 #endif
2455 #endif
2456
2457 #ifdef CMK_MEM_CHECKPOINT
2458       /**
2459        * @brief: reads the file with the kill information
2460        */
2461       void readKillFile(){
2462         FILE *fp=fopen(killFile,"r");
2463         if(!fp){
2464           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2465           return;
2466         }
2467         int proc;
2468         double sec;
2469         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2470           if(proc == CkMyNode() && CkMyRank() == 0){
2471             killTime = CmiWallTimer()+sec;
2472             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2473             CcdCallFnAfter(killLocal,NULL,sec*1000);
2474           }
2475         }
2476         fclose(fp);
2477       }
2478
2479 #if ! CMK_CONVERSE_MPI
2480       void CkDieNow()
2481       {
2482 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2483         // ignored for non-mpi version
2484         CmiPrintf("[%d] die now.\n", CmiMyPe());
2485         killTime = CmiWallTimer()+0.001;
2486         CcdCallFnAfter(killLocal,NULL,1);
2487 #endif
2488       }
2489 #endif
2490
2491 #endif
2492
2493 #include "CkMemCheckpoint.def.h"
2494
2495