separate table for local obj
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   if(CkMyPe()==0){
440     CkPrintf("local max iter is %d\n",localMaxIter);
441   }
442   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
443   int elemCount = CkCountChkpSyncElements();
444   if(CkMyPe()==0)
445     startTime = CmiWallTimer();
446   if(elemCount == 0){
447     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448   }
449 }
450
451 //when one replica failes, decide the next chkp iter
452
453 void CkMemCheckPT::recvIter(int iter){
454   if(maxIter<iter){
455     maxIter=iter;
456   }
457 }
458
459 void CkMemCheckPT::recvMaxIter(int iter){
460   localDecided = false;
461   if(CkMyPe()==0)
462     CkPrintf("checkpoint iteration is %d\n",iter);
463   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
464 }
465
466 void CkMemCheckPT::reachChkpIter(){
467   recvIterCount++;
468   elemCount = CkCountChkpSyncElements();
469   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
470   if(recvIterCount == elemCount){
471     recvIterCount = 0;
472     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
473   }
474 }
475
476 void CkMemCheckPT::startChkp(){
477   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
478   CkStartMemCheckpoint(cpCallback);
479 }
480
481 // called by checkpoint mgr to restore an array element
482 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
483 {
484 #if CMK_MEM_CHECKPOINT
485   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
486   // m->index.print();
487   PUP::fromMem p(m->packData);
488   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
489   CmiAssert(mgr);
490 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
491   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
492 #else
493   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
494 #endif
495
496   // find a list of array elements bound together
497   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
498   CmiAssert(elt);
499   CkLocRec_local *rec = elt->myRec;
500   CkVec<CkMigratable *> list;
501   mgr->migratableList(rec, list);
502   CmiAssert(list.length() > 0);
503   for (int l=0; l<list.length(); l++) {
504     elt = (ArrayElement *)list[l];
505     elt->budPEs[0] = m->bud1;
506     elt->budPEs[1] = m->bud2;
507     //    reset, may not needed now
508     // for now.
509     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
510       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
511       if (c) c->redNo = 0;
512     }
513   }
514 #endif
515 }
516
517 // return 1 if pe is a crashed processor
518 int CkMemCheckPT::isFailed(int pe)
519 {
520   for (int i=0; i<failedPes.length(); i++)
521     if (failedPes[i] == pe) return 1;
522   return 0;
523 }
524
525 // add pe into history list of all failed processors
526 void CkMemCheckPT::failed(int pe)
527 {
528   if (isFailed(pe)) return;
529   failedPes.push_back(pe);
530 }
531
532 int CkMemCheckPT::totalFailed()
533 {
534   return failedPes.length();
535 }
536
537 // create an checkpoint entry for array element of aid with index.
538 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
539 {
540   // error check, no duplicate
541   int idx, len = ckTable.size();
542   for (idx=0; idx<len; idx++) {
543     CkCheckPTInfo *entry = ckTable[idx];
544     if (index == entry->index) {
545       if (loc == entry->locMgr) {
546         // bindTo array elements
547         return;
548       }
549       // for array inheritance, the following check may fail
550       // because ArrayElement constructor of all superclasses are called
551       if (aid == entry->aid) {
552         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
553         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
554       }
555     }
556   }
557   CkCheckPTInfo *newEntry;
558   if (where == CkCheckPoint_inMEM)
559     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
560   else
561     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
562   ckTable.push_back(newEntry);
563   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
564 }
565
566 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
567 {
568 #if !CMK_CHKP_ALL       
569   int buddy = msg->bud1;
570   if (buddy == CkMyPe()) buddy = msg->bud2;
571   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
572   recvData(msg);
573   // ack
574   thisProxy[buddy].gotData();
575 #else
576   chkpTable[0] = NULL;
577   chkpTable[1] = NULL;
578   recvArrayCheckpoint(msg);
579   thisProxy[msg->bud2].gotData();
580 #endif
581 }
582
583 // loop through my checkpoint table and ask checkpointed array elements
584 // to send me checkpoint data.
585 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
586 void CkMemCheckPT::doItNow(int starter)
587 {
588   checkpointed = 1;
589   //cpCallback = cb;
590   cpStarter = starter;
591   inCheckpointing = 1;
592   if (CkMyPe() == cpStarter) {
593     startTime = CmiWallTimer();
594     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
595   }
596 #if CMK_CONVERSE_MPI
597   if(CmiNumPartition()==1)
598 #endif    
599   {
600
601 #if !CMK_CHKP_ALL
602     int len = ckTable.length();
603     for (int i=0; i<len; i++) {
604       CkCheckPTInfo *entry = ckTable[i];
605       // always let the bigger number processor send request
606       //if (CkMyPe() < entry->pNo) continue;
607       // always let the smaller number processor send request, may on same proc
608       if (!isMaster(entry->pNo)) continue;
609       // call inmem_checkpoint to the array element, ask it to send
610       // back checkpoint data via recvData().
611       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
612       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
613     }
614     // if my table is empty, then I am done
615     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
616 #else
617     startArrayCheckpoint();
618 #endif
619     sendProcData();
620   }
621 #if CMK_CONVERSE_MPI
622   else
623   {
624     startCheckpoint();
625   }
626 #endif
627   // pack and send proc level data
628 }
629
630 class MemElementPacker : public CkLocIterator{
631   private:
632     //    CkLocMgr *locMgr;
633     PUP::er &p;
634     std::map<CkHashCode,CkLocation> arrayMap;
635   public:
636     MemElementPacker(PUP::er &p_):p(p_){};
637     void addLocation(CkLocation &loc){
638       CkArrayIndexMax idx = loc.getIndex();
639       arrayMap[idx.hash()] = loc;
640     }
641     void writeCheckpoint(){
642       std::map<CkHashCode, CkLocation>::iterator it;
643       for(it = arrayMap.begin();it!=arrayMap.end();it++){
644         CkLocation loc = it->second;
645         CkLocMgr *locMgr = loc.getManager();
646         CkArrayIndexMax idx = loc.getIndex();
647         CkGroupID gID = locMgr->ckGetGroupID();
648         p|gID;
649         p|idx;
650         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
651       }
652     }
653 };
654
655 void pupAllElements(PUP::er &p){
656 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
657   int numElements;
658   if(!p.isUnpacking()){
659     numElements = CkCountArrayElements();
660   }
661   p | numElements;
662   if(!p.isUnpacking()){
663     MemElementPacker packer(p);
664     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
665     packer.writeCheckpoint();
666   }
667 #endif
668 }
669
670 void CkMemCheckPT::startArrayCheckpoint(){
671 #if CMK_CHKP_ALL
672   int size;
673   {
674     PUP::sizer psizer;
675     pupAllElements(psizer);
676     size = psizer.size();
677   }
678   int packSize = size/sizeof(double)+1;
679   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
680   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
681   msg->len = size;
682   msg->cp_flag = 1;
683   int budPEs[2];
684   msg->bud1=CkMyPe();
685   msg->bud2=ChkptOnPe(CkMyPe());
686   {
687     PUP::toMem p(msg->packData);
688     pupAllElements(p);
689   }
690   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
691   if(chkpTable[0]) delete chkpTable[0];
692   chkpTable[0] = msg;
693   //send the checkpoint to my 
694   recvCount++;
695 #endif
696 }
697
698 void CkMemCheckPT::startCheckpoint(){
699 #if CMK_CONVERSE_MPI
700   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
701     CkPrintf("in start checkpointing!!!!\n");
702   int size;
703   {
704     PUP::sizer p;
705     _handleProcData(p,CmiFalse);
706     size = p.size();
707   }
708   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
709   procMsg->len = size;
710   procMsg->cp_flag = 1;
711   {
712     PUP::toMem p(procMsg->packData);
713     _handleProcData(p,CmiFalse);
714   }
715   int pointer = CpvAccess(curPointer);
716   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
717   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
718
719   {
720     PUP::sizer psizer;
721     pupAllElements(psizer);
722     size = psizer.size();
723   }
724   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
725   msg->len = size;
726   msg->cp_flag = 1;
727   int checksum;
728   {
729     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
730       PUP::checker p(msg->packData);
731       pupAllElements(p);
732       checksum = p.getChecksum();
733     }else{  
734 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
735       PUP::toMem p(msg->packData);
736       pupAllElements(p);
737     }
738   }
739   pointer = CpvAccess(curPointer);
740   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
741   CpvAccess(chkpBuf)[pointer] = msg;
742   if(CkMyPe()==0)
743     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
744   if(CkReplicaAlive()==1){
745     CpvAccess(recvdLocal) = 1;
746     if(CpvAccess(use_checksum)){
747       CpvAccess(localChecksum) = checksum;
748       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
749       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
750       //only one reaplica will send
751       if(CmiMyPartition()==0){
752         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
753         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
754       }
755     }else{
756       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
757       CkPackMessage(&env);
758       if(CmiMyPartition()==0){
759         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
760         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
761       }
762     }
763     if(CmiMyPartition()==0){
764       notifyReplica = 1;
765       thisProxy[CkMyPe()].doneComparison(true);
766     }
767   }
768   if(CpvAccess(recvdRemote)==1){
769     //only partition 1 will do it
770     //compare the checkpoint 
771     int size = CpvAccess(chkpBuf)[pointer]->len;
772     if(CpvAccess(use_checksum)){
773       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
774         thisProxy[CkMyPe()].doneComparison(true);
775       }
776       else{
777         thisProxy[CkMyPe()].doneComparison(false);
778       }
779     }else{
780       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
781         thisProxy[CkMyPe()].doneComparison(true);
782       }
783       else{
784         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
785         thisProxy[CkMyPe()].doneComparison(false);
786       }
787     }
788     if(CkMyPe()==0)
789       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
790   }
791   else{
792     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
793       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
794       if(CpvAccess(remoteReady)==1){
795         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
796         {       
797           int pointer = CpvAccess(curPointer);
798           //send the proc data
799           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
800           procMsg->pointer = pointer;
801           envelope * env = (envelope *)(UsrToEnv(procMsg));
802           CkPackMessage(&env);
803           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
804           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
805           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
806             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
807           }
808         }
809         //send the array checkpoint data
810         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
811         msg->pointer = CpvAccess(curPointer);
812         envelope * env = (envelope *)(UsrToEnv(msg));
813         CkPackMessage(&env);
814         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
815         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
816         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
817           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
818       }else{
819         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
820           int pointer = CpvAccess(curPointer);
821           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
822           (CpvAccess(recoverProcBuf))->pointer = pointer;
823         }
824         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
825         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
826         CpvAccess(localReady) = 1;
827       }
828       //can continue work, no need to wait for my replica
829       int _ret = 1;
830       notifyReplica = 1;
831       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
832       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
833     }
834   }
835 #endif
836 }
837
838 void CkMemCheckPT::doneComparison(bool ret){
839   int _ret = 1;
840   if(!ret){
841     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
842     _ret = 0;
843   }else{
844     _ret = 1;
845   }
846   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
847   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
848 }
849
850 void CkMemCheckPT::doneRComparison(int ret){
851   //    if(CpvAccess(curPointer) == 0){
852   //if(ret==CkNumPes()){
853     CpvAccess(localChkpDone) = 1;
854     if(CpvAccess(remoteChkpDone) ==1){
855       thisProxy.doneBothComparison();
856     }else{
857       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
858     }
859   //}
860   /*else{
861     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
862     startTime = CmiWallTimer();
863     thisProxy.RollBack();
864   }*/
865   if(notifyReplica == 0){
866     //notify the replica am done
867     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
868     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
869     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
870     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
871     notifyReplica = 1;
872   }
873 }
874
875 void CkMemCheckPT::doneBothComparison(){
876   CpvAccess(recvdRemote) = 0;
877   CpvAccess(remoteReady) = 0;
878   CpvAccess(localReady) = 0;
879   CpvAccess(recvdLocal) = 0;
880   CpvAccess(localChkpDone) = 0;
881   CpvAccess(remoteChkpDone) = 0;
882   CpvAccess(remoteStarted) = 0;
883   CpvAccess(localStarted) = 0;
884   CpvAccess(_remoteCrashedNode) = -1;
885   CkMemCheckPT::replicaAlive = 1;
886   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
887   CpvAccess(curPointer)^=1;
888   inCheckpointing = 0;
889   notifyReplica = 0;
890   if(CkMyPe() == 0){
891     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
892   }
893   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
894 }
895
896 void CkMemCheckPT::RollBack(){
897   //restore group data
898   checkpointed = 0;
899   inRestarting = 1;
900   int pointer = CpvAccess(curPointer)^1;//use the previous one
901   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
902   PUP::fromMem p(chkpMsg->packData);    
903
904   //destroy array elements
905   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
906   int numGroups = CkpvAccess(_groupIDTable)->size();
907   for(int i=0;i<numGroups;i++) {
908     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
909     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
910     obj->flushStates();
911     obj->ckJustMigrated();
912   }
913   //restore array elements
914
915   int numElements;
916   p|numElements;
917
918   if(p.isUnpacking()){
919     for(int i=0;i<numElements;i++){
920       CkGroupID gID;
921       CkArrayIndex idx;
922       p|gID;
923       p|idx;
924       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
925       CmiAssert(mgr);
926       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
927     }
928     }
929     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
930     contribute(cb);
931   }
932
933   void CkMemCheckPT::notifyReplicaDie(int pe){
934     replicaAlive = 0;
935     CpvAccess(_remoteCrashedNode) = pe;
936   }
937
938   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
939   {
940 #if CMK_CHKP_ALL
941     int idx = 1;
942     if(msg->bud1 == CkMyPe()){
943       idx = 0;
944     }
945     int isChkpting = msg->cp_flag;
946     if(isChkpting == 1){
947       if(chkpTable[idx]) delete chkpTable[idx];
948     }
949     chkpTable[idx] = msg;
950     if(isChkpting){
951       recvCount++;
952       if(recvCount == 2){
953         if (where == CkCheckPoint_inMEM) {
954           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
955         }
956         else if (where == CkCheckPoint_inDISK) {
957           // another barrier for finalize the writing using fsync
958           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
959           contribute(0,NULL,CkReduction::sum_int,localcb);
960         }
961         else
962           CmiAbort("Unknown checkpoint scheme");
963         recvCount = 0;
964       }
965     }
966 #endif
967   }
968
969   // don't handle array elements
970   static inline void _handleProcData(PUP::er &p, CmiBool create)
971   {
972     // save readonlys, and callback BTW
973     CkPupROData(p);
974
975     // save mainchares 
976     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
977
978 #ifndef CMK_CHARE_USE_PTR
979     // save non-migratable chare
980     CkPupChareData(p);
981 #endif
982
983     // save groups into Groups.dat
984     CkPupGroupData(p,create);
985
986     // save nodegroups into NodeGroups.dat
987     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
988   }
989
990   void CkMemCheckPT::sendProcData()
991   {
992     // find out size of buffer
993     int size;
994     {
995       PUP::sizer p;
996       _handleProcData(p,CmiTrue);
997       size = p.size();
998     }
999     int packSize = size;
1000     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1001     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1002     {
1003       PUP::toMem p(msg->packData);
1004       _handleProcData(p,CmiTrue);
1005     }
1006     msg->pe = CkMyPe();
1007     msg->len = size;
1008     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1009     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1010   }
1011
1012   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1013   {
1014     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1015     CpvAccess(procChkptBuf) = msg;
1016     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1017     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1018   }
1019
1020   // ArrayElement call this function to give us the checkpointed data
1021   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1022   {
1023     int len = ckTable.length();
1024     int idx;
1025     for (idx=0; idx<len; idx++) {
1026       CkCheckPTInfo *entry = ckTable[idx];
1027       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1028     }
1029     CkAssert(idx < len);
1030     int isChkpting = msg->cp_flag;
1031     ckTable[idx]->updateBuffer(msg);
1032     if (isChkpting) {
1033       // all my array elements have returned their inmem data
1034       // inform starter processor that I am done.
1035       recvCount ++;
1036       if (recvCount == ckTable.length()) {
1037         if (where == CkCheckPoint_inMEM) {
1038           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1039         }
1040         else if (where == CkCheckPoint_inDISK) {
1041           // another barrier for finalize the writing using fsync
1042           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1043           contribute(0,NULL,CkReduction::sum_int,localcb);
1044         }
1045         else
1046           CmiAbort("Unknown checkpoint scheme");
1047         recvCount = 0;
1048       } 
1049     }
1050   }
1051
1052   // only used in disk checkpointing
1053   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1054   {
1055     delete m;
1056 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1057     system("sync");
1058 #endif
1059     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1060   }
1061
1062   // only is called on cpStarter when checkpoint is done
1063   void CkMemCheckPT::cpFinish()
1064   {
1065     CmiAssert(CkMyPe() == cpStarter);
1066     peCount++;
1067     // now that all processors have finished, activate callback
1068     if (peCount == 2) 
1069     {
1070       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1071       peCount = 0;
1072       thisProxy.report();
1073     }
1074   }
1075
1076   // for debugging, report checkpoint info
1077   void CkMemCheckPT::report()
1078   {
1079     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1080     inCheckpointing = 0;
1081 #if !CMK_CHKP_ALL
1082     int objsize = 0;
1083     int len = ckTable.length();
1084     for (int i=0; i<len; i++) {
1085       CkCheckPTInfo *entry = ckTable[i];
1086       CmiAssert(entry);
1087       objsize += entry->getSize();
1088     }
1089     CmiAssert(CpvAccess(procChkptBuf));
1090     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1091 #else
1092     if(CkMyPe()==0)
1093       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1094 #endif
1095   }
1096
1097   /*****************************************************************************
1098     RESTART Procedure
1099    *****************************************************************************/
1100
1101   // master processor of two buddies
1102   inline int CkMemCheckPT::isMaster(int buddype)
1103   {
1104 #if 0
1105     int mype = CkMyPe();
1106     //CkPrintf("ismaster: %d %d\n", pe, mype);
1107     if (CkNumPes() - totalFailed() == 2) {
1108       return mype > buddype;
1109     }
1110     for (int i=1; i<CkNumPes(); i++) {
1111       int me = (buddype+i)%CkNumPes();
1112       if (isFailed(me)) continue;
1113       if (me == mype) return 1;
1114       else return 0;
1115     }
1116     return 0;
1117 #else
1118     // smaller one
1119     int mype = CkMyPe();
1120     //CkPrintf("ismaster: %d %d\n", pe, mype);
1121     if (CkNumPes() - totalFailed() == 2) {
1122       return mype < buddype;
1123     }
1124 #if NODE_CHECKPOINT
1125     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1126     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1127 #else
1128       for (int i=1; i<CkNumPes(); i++) {
1129 #endif
1130         int me = (mype+i)%CkNumPes();
1131         if (isFailed(me)) continue;
1132         if (me == buddype) return 1;
1133         else return 0;
1134       }
1135       return 0;
1136 #endif
1137     }
1138
1139
1140
1141 #if 0
1142     // helper class to pup all elements that belong to same ckLocMgr
1143     class ElementDestoryer : public CkLocIterator {
1144       private:
1145         CkLocMgr *locMgr;
1146       public:
1147         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1148         void addLocation(CkLocation &loc) {
1149           CkArrayIndex idx=loc.getIndex();
1150           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1151           loc.destroy();
1152         }
1153     };
1154 #endif
1155
1156     // restore the bitmap vector for LB
1157     void CkMemCheckPT::resetLB(int diepe)
1158     {
1159 #if CMK_LBDB_ON
1160       int i;
1161       char *bitmap = new char[CkNumPes()];
1162       // set processor available bitmap
1163       get_avail_vector(bitmap);
1164       for (i=0; i<failedPes.length(); i++)
1165         bitmap[failedPes[i]] = 0; 
1166       bitmap[diepe] = 0;
1167
1168 #if CK_NO_PROC_POOL
1169       set_avail_vector(bitmap);
1170 #endif
1171
1172       // if I am the crashed pe, rebuild my failedPEs array
1173       if (CkMyNode() == diepe)
1174         for (i=0; i<CkNumPes(); i++) 
1175           if (bitmap[i]==0) failed(i);
1176
1177       delete [] bitmap;
1178 #endif
1179     }
1180
1181     static double restartT;
1182
1183     // in case when failedPe dies, everybody go through its checkpoint table:
1184     // destory all array elements
1185     // recover lost buddies
1186     // reconstruct all array elements from check point data
1187     // called on all processors
1188     void CkMemCheckPT::restart(int diePe)
1189     {
1190 #if CMK_MEM_CHECKPOINT
1191       double curTime = CmiWallTimer();
1192       if (CkMyPe() == diePe){
1193         restartT = CmiWallTimer();
1194         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1195       }
1196       stage = (char*)"resetLB";
1197       startTime = curTime;
1198       if (CkMyPe() == diePe)
1199         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1200
1201 #if CK_NO_PROC_POOL
1202       failed(diePe);    // add into the list of failed pes
1203 #endif
1204       thisFailedPe = diePe;
1205
1206       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1207
1208       inRestarting = 1;
1209
1210       // disable load balancer's barrier
1211       //if (CkMyPe() != diePe) resetLB(diePe);
1212
1213       CKLOCMGR_LOOP(mgr->startInserting(););
1214
1215
1216       if(CmiNumPartition()==1){
1217         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1218       }else{
1219         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1220         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1221       }
1222 #endif
1223     }
1224
1225     // loally remove all array elements
1226     void CkMemCheckPT::removeArrayElements()
1227     {
1228 #if CMK_MEM_CHECKPOINT
1229       int len = ckTable.length();
1230       double curTime = CmiWallTimer();
1231       if (CkMyPe() == thisFailedPe) 
1232         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1233       stage = (char*)"removeArrayElements";
1234       startTime = curTime;
1235
1236       //  if (cpCallback.isInvalid()) 
1237       //          CkPrintf("invalid pe %d\n",CkMyPe());
1238       //          CkAbort("Didn't set restart callback\n");;
1239       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1240
1241       // get rid of all buffering and remote recs
1242       // including destorying all array elements
1243 #if CK_NO_PROC_POOL  
1244       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1245 #else
1246       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1247 #endif
1248       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1249 #endif
1250     }
1251
1252     // flush state in reduction manager
1253     void CkMemCheckPT::resetReductionMgr()
1254     {
1255       if (CkMyPe() == thisFailedPe) 
1256         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1257       int numGroups = CkpvAccess(_groupIDTable)->size();
1258       for(int i=0;i<numGroups;i++) {
1259         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1260         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1261         obj->flushStates();
1262         obj->ckJustMigrated();
1263       }
1264       // reset again
1265       //CpvAccess(_qd)->flushStates();
1266       if(CmiNumPartition()==1){
1267         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1268       }
1269       else
1270         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1271     }
1272
1273     // recover the lost buddies
1274     void CkMemCheckPT::recoverBuddies()
1275     {
1276       int idx;
1277       int len = ckTable.length();
1278       // ready to flush reduction manager
1279       // cannot be CkMemCheckPT::restart because destory will modify states
1280       double curTime = CmiWallTimer();
1281       if (CkMyPe() == thisFailedPe)
1282         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1283       stage = (char *)"recoverBuddies";
1284       if (CkMyPe() == thisFailedPe)
1285         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1286       startTime = curTime;
1287
1288       // recover buddies
1289       expectCount = 0;
1290 #if !CMK_CHKP_ALL
1291       for (idx=0; idx<len; idx++) {
1292         CkCheckPTInfo *entry = ckTable[idx];
1293         if (entry->pNo == thisFailedPe) {
1294 #if CK_NO_PROC_POOL
1295           // find a new buddy
1296           int budPe = BuddyPE(CkMyPe());
1297 #else
1298           int budPe = thisFailedPe;
1299 #endif
1300           CkArrayCheckPTMessage *msg = entry->getCopy();
1301           msg->bud1 = budPe;
1302           msg->bud2 = CkMyPe();
1303           msg->cp_flag = 0;            // not checkpointing
1304           thisProxy[budPe].recoverEntry(msg);
1305           expectCount ++;
1306         }
1307       }
1308 #else
1309       //send to failed pe
1310       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1311 #if CK_NO_PROC_POOL
1312         // find a new buddy
1313         int budPe = BuddyPE(CkMyPe());
1314 #else
1315         int budPe = thisFailedPe;
1316 #endif
1317         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1318         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1319         msg->cp_flag = 0;            // not checkpointing
1320         msg->bud1 = budPe;
1321         msg->bud2 = CkMyPe();
1322         thisProxy[budPe].recoverEntry(msg);
1323         expectCount ++;
1324       }
1325 #endif
1326
1327       if (expectCount == 0) {
1328         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1329       }
1330       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1331     }
1332
1333     void CkMemCheckPT::gotData()
1334     {
1335       ackCount ++;
1336       if (ackCount == expectCount) {
1337         ackCount = 0;
1338         expectCount = -1;
1339         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1340         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1341       }
1342     }
1343
1344     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1345     {
1346
1347       for (int i=0; i<n; i++) {
1348         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1349         mgr->updateLocation(idx[i], nowOnPe);
1350       }
1351       thisProxy[nowOnPe].gotReply();
1352     }
1353
1354     // restore array elements
1355     void CkMemCheckPT::recoverArrayElements()
1356     {
1357       double curTime = CmiWallTimer();
1358       int len = ckTable.length();
1359       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1360       stage = (char *)"recoverArrayElements";
1361       if (CkMyPe() == thisFailedPe)
1362         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1363       startTime = curTime;
1364       int flag = 0;
1365       // recover all array elements
1366       int count = 0;
1367
1368 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1369       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1370       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1371 #endif
1372
1373 #if !CMK_CHKP_ALL
1374       for (int idx=0; idx<len; idx++)
1375       {
1376         CkCheckPTInfo *entry = ckTable[idx];
1377 #if CK_NO_PROC_POOL
1378         // the bigger one will do 
1379         //    if (CkMyPe() < entry->pNo) continue;
1380         if (!isMaster(entry->pNo)) continue;
1381 #else
1382         // smaller one do it, which has the original object
1383         if (CkMyPe() == entry->pNo+1 || 
1384             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1385 #endif
1386         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1387
1388         entry->updateBuddy(CkMyPe(), entry->pNo);
1389         CkArrayCheckPTMessage *msg = entry->getCopy();
1390         // gzheng
1391         //thisProxy[CkMyPe()].inmem_restore(msg);
1392         inmem_restore(msg);
1393 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1394         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1395         int homePe = mgr->homePe(msg->index);
1396         if (homePe != CkMyPe()) {
1397           gmap[homePe].push_back(msg->locMgr);
1398           imap[homePe].push_back(msg->index);
1399         }
1400 #endif
1401         CkFreeMsg(msg);
1402         count ++;
1403       }
1404 #else
1405       char * packData;
1406       if(CmiNumPartition()==1){
1407         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1408         packData = (char *)msg->packData;
1409       }
1410       else{
1411         int pointer = CpvAccess(curPointer);
1412         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1413         packData = msg->packData;
1414       }
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       recoverAll(packData,gmap,imap);
1417 #else
1418       recoverAll(packData);
1419 #endif
1420 #endif
1421 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1422       for (int i=0; i<CkNumPes(); i++) {
1423         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1424           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1425           flag++;       
1426         }
1427       }
1428       delete [] imap;
1429       delete [] gmap;
1430 #endif
1431       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1432
1433       CKLOCMGR_LOOP(mgr->doneInserting(););
1434
1435       // _crashedNode = -1;
1436       CpvAccess(_crashedNode) = -1;
1437 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1438       if (CkMyPe() == 0)
1439         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1440 #else
1441       if(flag == 0)
1442       {
1443         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1444       }
1445 #endif
1446     }
1447
1448     void CkMemCheckPT::gotReply(){
1449       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1450     }
1451
1452     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1453 #if CMK_CHKP_ALL
1454       PUP::fromMem p(packData);
1455       int numElements;
1456       p|numElements;
1457       if(p.isUnpacking()){
1458         for(int i=0;i<numElements;i++){
1459           CkGroupID gID;
1460           CkArrayIndex idx;
1461           p|gID;
1462           p|idx;
1463           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1464           int homePe = mgr->homePe(idx);
1465 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1466           mgr->resume(idx,p,CmiTrue,CmiTrue);
1467 #else
1468           if(CmiNumPartition()==1)
1469             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1470           else{
1471             if(CkMyPe()==thisFailedPe){
1472               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1473             }
1474             else{
1475               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1476             }
1477           }
1478 #endif
1479 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1480           homePe = mgr->homePe(idx);
1481           if (homePe != CkMyPe()) {
1482             gmap[homePe].push_back(gID);
1483             imap[homePe].push_back(idx);
1484           }
1485 #endif
1486         }
1487       }
1488 #endif
1489     }
1490
1491
1492     // on every processor
1493     // turn load balancer back on
1494     void CkMemCheckPT::finishUp()
1495     {
1496       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1497       //CKLOCMGR_LOOP(mgr->doneInserting(););
1498
1499       if (CkMyPe() == thisFailedPe)
1500       {
1501         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1502         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1503         fflush(stdout);
1504       }
1505       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1506       inRestarting = 0;
1507       maxIter = -1;
1508 #if CMK_CONVERSE_MPI    
1509       if(CmiNumPartition()!=1){
1510         CpvAccess(recvdProcChkp) = 0;
1511         CpvAccess(recvdArrayChkp) = 0;
1512         CpvAccess(curPointer)^=1;
1513         //notify my replica, restart is done
1514         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1515           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1516           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1517           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1518         }
1519       }
1520       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1521         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1522       }
1523 #endif
1524
1525 #if CK_NO_PROC_POOL
1526 #if NODE_CHECKPOINT
1527       int numnodes = CmiNumPhysicalNodes();
1528 #else
1529       int numnodes = CkNumPes();
1530 #endif
1531       if (numnodes-totalFailed() <=2) {
1532         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1533         _memChkptOn = 0;
1534       }
1535 #endif
1536     }
1537
1538     void CkMemCheckPT::recoverFromSoftFailure()
1539     {
1540       inRestarting = 0;
1541       maxIter = -1;
1542       CpvAccess(recvdRemote) = 0;
1543       CpvAccess(recvdLocal) = 0;
1544       CpvAccess(localChkpDone) = 0;
1545       CpvAccess(remoteChkpDone) = 0;
1546       CpvAccess(remoteReady) = 0;
1547       CpvAccess(localReady) = 0;
1548       inCheckpointing = 0;
1549       notifyReplica = 0;
1550       CpvAccess(remoteStarted) = 0;
1551       CpvAccess(localStarted) = 0;
1552       CpvAccess(_remoteCrashedNode) = -1;
1553       CkMemCheckPT::replicaAlive = 1;
1554       inCheckpointing = 0;
1555       notifyReplica = 0;
1556       if(CkMyPe() == 0){
1557         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1558       }
1559       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1560     }
1561     // called only on 0
1562     void CkMemCheckPT::quiescence(CkCallback &cb)
1563     {
1564       static int pe_count = 0;
1565       pe_count ++;
1566       CmiAssert(CkMyPe() == 0);
1567       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1568       if (pe_count == CkNumPes()) {
1569         pe_count = 0;
1570         cb.send();
1571       }
1572     }
1573
1574     // User callable function - to start a checkpoint
1575     // callback cb is used to pass control back
1576     void CkStartMemCheckpoint(CkCallback &cb)
1577     {
1578 #if CMK_MEM_CHECKPOINT
1579       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1580       /*if (_memChkptOn == 0) {
1581         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1582         cb.send();
1583         return;
1584         }*/
1585       if (CkInRestarting()) {
1586         // trying to checkpointing during restart
1587         cb.send();
1588         return;
1589       }
1590       // store user callback and user data
1591       CkMemCheckPT::cpCallback = cb;
1592
1593
1594       //send to my replica that checkpoint begins 
1595       if(CkReplicaAlive()==1){
1596         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1597         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1598         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1599       }
1600       CpvAccess(localStarted) = 1;
1601       // broadcast to start check pointing
1602       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1603         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1604         checkptMgr.doItNow(CkMyPe());
1605       }
1606 #else
1607       // when mem checkpoint is disabled, invike cb immediately
1608       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1609       cb.send();
1610 #endif
1611     }
1612
1613     void CkRestartCheckPoint(int diePe)
1614     {
1615       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1616       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1617       // broadcast
1618       checkptMgr.restart(diePe);
1619     }
1620
1621     static int _diePE = -1;
1622
1623     // callback function used locally by ccs handler
1624     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1625     {
1626       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1627       CkRestartCheckPoint(_diePE);
1628     }
1629
1630
1631     // called on crashed PE
1632     static void restartBeginHandler(char *msg)
1633     {
1634       CmiFree(msg);
1635 #if CMK_MEM_CHECKPOINT
1636 #if CMK_USE_BARRIER
1637       if(CkMyPe()!=_diePE){
1638         printf("restar begin on %d\n",CkMyPe());
1639         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1640         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1641         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1642       }else{
1643         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1644         CkRestartCheckPointCallback(NULL, NULL);
1645       }
1646 #else
1647       static int count = 0;
1648       CmiAssert(CkMyPe() == _diePE);
1649       count ++;
1650       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1651         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1652         CkRestartCheckPointCallback(NULL, NULL);
1653         count = 0;
1654       }
1655 #endif
1656 #endif
1657     }
1658
1659     extern void _discard_charm_message();
1660     extern void _resume_charm_message();
1661
1662     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1663       return data;
1664     }
1665
1666     static void restartBcastHandler(char *msg)
1667     {
1668 #if CMK_MEM_CHECKPOINT
1669       // advance phase counter
1670       CkMemCheckPT::inRestarting = 1;
1671       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1672       // gzheng
1673       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1674
1675       if (CkMyPe()==_diePE)
1676         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1677
1678       // reset QD counters
1679       /*  gzheng
1680           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1681        */
1682
1683       /*  gzheng
1684           if (CkMyPe()==_diePE)
1685           CkRestartCheckPointCallback(NULL, NULL);
1686        */
1687       CmiFree(msg);
1688
1689       _resume_charm_message();
1690
1691       // reduction
1692       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1693       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1694 #if CMK_USE_BARRIER
1695       //CmiPrintf("before reduce\n");   
1696       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1697       //CmiPrintf("after reduce\n");    
1698 #else
1699       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1700 #endif 
1701       checkpointed = 0;
1702 #endif
1703     }
1704
1705     extern void _initDone();
1706
1707     bool compare(char * buf1, char *buf2){
1708       if(CkMyPe()==0)
1709         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1710       PUP::checker pchecker(buf1,buf2);
1711       pchecker.skip();
1712       int numElements;
1713       pchecker|numElements;
1714       for(int i=0;i<numElements;i++){
1715         CkGroupID gID;
1716         CkArrayIndex idx;
1717
1718         pchecker|gID;
1719         pchecker|idx;
1720
1721         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1722         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1723       }
1724       if(CkMyPe()==0)
1725         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1726       //int fault_num = pchecker.getFaultNum();
1727       bool result = pchecker.getResult();
1728       /*if(!result){
1729         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1730       }*/
1731       return result;
1732     }
1733     int getChecksum(char * buf){
1734       PUP::checker pchecker(buf);
1735       pchecker.skip();
1736       int numElements;
1737       pchecker|numElements;
1738 //      CkPrintf("num %d\n",numElements);
1739       for(int i=0;i<numElements;i++){
1740         CkGroupID gID;
1741         CkArrayIndex idx;
1742
1743         pchecker|gID;
1744         pchecker|idx;
1745
1746         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1747         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1748       }
1749       return pchecker.getChecksum();
1750     }
1751
1752     static void recvRemoteChkpHandler(char *msg){
1753       CpvAccess(remoteChkpDone) = 1;
1754       if(CpvAccess(use_checksum)){
1755         if(CkMyPe()==0)
1756           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1757         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1758         CpvAccess(recvdRemote) = 1;
1759         if(CpvAccess(recvdLocal)==1){
1760           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1761             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1762           }
1763           else{
1764             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1765           }
1766         }
1767       }else{
1768         envelope *env = (envelope *)msg;
1769         CkUnpackMessage(&env);
1770         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1771         if(CpvAccess(recvdLocal)==1){
1772           int pointer = CpvAccess(curPointer);
1773           int size = CpvAccess(chkpBuf)[pointer]->len;
1774           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1775             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1776           }else
1777           {
1778             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1779           }
1780           delete chkpMsg;
1781           if(CkMyPe()==0)
1782             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1783         }else{
1784           CpvAccess(recvdRemote) = 1;
1785           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1786           CpvAccess(buddyBuf) = chkpMsg;
1787         }
1788       }
1789     }
1790
1791     static void replicaRecoverHandler(char *msg){
1792       //fflush(stdout);
1793       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1794       CpvAccess(remoteChkpDone) = 1;
1795       if(CpvAccess(localChkpDone) == 1)
1796         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1797       CmiFree(msg);
1798     }
1799
1800     static void replicaChkpDoneHandler(char *msg){
1801       CpvAccess(remoteChkpDone) = 1;
1802       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1803       if(CpvAccess(localChkpDone) == 1)
1804         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1805       CmiFree(msg);
1806     }
1807
1808     static void replicaDieHandler(char * msg){
1809 #if CMK_CONVERSE_MPI
1810       //broadcast to every one in my replica
1811       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1812       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1813 #endif
1814     }
1815
1816     static void replicaChkpStartHandler(char * msg){
1817       CpvAccess(remoteStarted) =1;
1818       if(CpvAccess(localStarted)==1){    
1819         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1820         checkptMgr.doItNow(0);
1821       }
1822     }
1823
1824
1825     static void replicaDieBcastHandler(char *msg){
1826       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1827       CpvAccess(_remoteCrashedNode) = diePe;
1828       if(CkMyPe()==diePe){
1829         CmiPrintf("pe %d in replicad word die\n",diePe);
1830         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1831         fflush(stdout);
1832       }
1833       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1834         find_spare_mpirank(diePe,CmiMyPartition()^1);
1835       }
1836       //broadcast to my partition to get local max iter
1837       if(CpvAccess(resilience)!=1){
1838         CkMemCheckPT::replicaAlive = 0;
1839         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1840       }
1841       CmiFree(msg);
1842     }
1843
1844     static void recoverRemoteProcDataHandler(char *msg){
1845       envelope *env = (envelope *)msg;
1846       CkUnpackMessage(&env);
1847       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1848
1849       //store the checkpoint
1850       int pointer = procMsg->pointer;
1851
1852
1853       if(CkMyPe()==CpvAccess(_crashedNode)){
1854         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1855         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1856         PUP::fromMem p(procMsg->packData);
1857         _handleProcData(p,CmiTrue);
1858         _initDone();
1859       }
1860       else{
1861         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1862         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1863         //_handleProcData(p,CmiFalse);
1864       }
1865       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1866       CKLOCMGR_LOOP(mgr->startInserting(););
1867
1868       CpvAccess(recvdProcChkp) =1;
1869       if(CpvAccess(recvdArrayChkp)==1){
1870         _resume_charm_message();
1871         _diePE = CpvAccess(_crashedNode);
1872         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1873         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1874         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1875       }
1876     }
1877
1878     static void recoverRemoteArrayDataHandler(char *msg){
1879       envelope *env = (envelope *)msg;
1880       CkUnpackMessage(&env);
1881       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1882
1883       //store the checkpoint
1884       int pointer = chkpMsg->pointer;
1885       CpvAccess(curPointer) = pointer;
1886       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1887       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1888       CpvAccess(recvdArrayChkp) =1;
1889       CkMemCheckPT::inRestarting = 1;
1890       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1891         _resume_charm_message();
1892         _diePE = CpvAccess(_crashedNode);
1893         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1894         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1895         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1896         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1897       }
1898     }
1899
1900     static void recvPhaseHandler(char * msg)
1901     {
1902       CpvAccess(_curRestartPhase)--;
1903       CkMemCheckPT::inRestarting = 1;
1904       CmiFree(msg);
1905       //notify the buddy in the replica now i can receive the checkpoint msg
1906       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1907         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1908         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1909         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1910         //timer start
1911         if(CpvAccess(_crashedNode)==CkMyPe())
1912           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1913         if(CpvAccess(_crashedNode)==CmiMyPe())
1914           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1915       }else{
1916         CpvAccess(curPointer)^=1;
1917         CkMemCheckPT::inRestarting = 1;
1918         _resume_charm_message();
1919         _diePE = CpvAccess(_crashedNode);
1920       }
1921     }
1922     
1923     static void askRecoverDataHandler(char * msg){
1924       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1925         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1926       if(CpvAccess(resilience)!=1){
1927         CpvAccess(remoteReady)=1;
1928         if(CpvAccess(localReady)==1){
1929           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1930           {     
1931             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1932             CkPackMessage(&env);
1933             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1934             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1935             CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
1936           }
1937           //send the array checkpoint data
1938           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1939           CkPackMessage(&env);
1940           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1941           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1942           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1943             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1944         }
1945       }else{
1946         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
1947         int pointer = CpvAccess(curPointer)^1;
1948         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1949         procMsg->pointer = pointer;
1950         envelope * env = (envelope *)(UsrToEnv(procMsg));
1951         CkPackMessage(&env);
1952         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1953         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1954         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
1955         
1956         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1957         arrayMsg->pointer = pointer;
1958         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1959         CkPackMessage(&env1);
1960         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1961         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1962         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1963       }
1964     }
1965     // called on crashed processor
1966     static void recoverProcDataHandler(char *msg)
1967     {
1968 #if CMK_MEM_CHECKPOINT
1969       int i;
1970       envelope *env = (envelope *)msg;
1971       CkUnpackMessage(&env);
1972       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1973       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1974       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1975       PUP::fromMem p(procMsg->packData);
1976       _handleProcData(p,CmiTrue);
1977
1978       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1979       // gzheng
1980       CKLOCMGR_LOOP(mgr->startInserting(););
1981
1982       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1983       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1984       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1985       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1986
1987       _initDone();
1988       //   CpvAccess(_qd)->flushStates();
1989       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1990 #endif
1991     }
1992     //for replica, got the phase number from my neighbor processor in the current partition
1993     static void askPhaseHandler(char *msg)
1994     {
1995 #if CMK_MEM_CHECKPOINT
1996       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1997       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1998       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1999       CmiSetHandler(msg, recvPhaseHandlerIdx);
2000       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2001 #endif
2002     }
2003     // called on its backup processor
2004     // get backup message buffer and sent to crashed processor
2005     static void askProcDataHandler(char *msg)
2006     {
2007 #if CMK_MEM_CHECKPOINT
2008       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2009       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2010       if (CpvAccess(procChkptBuf) == NULL)  {
2011         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2012         CkAbort("no checkpoint found");
2013       }
2014       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2015       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2016
2017       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2018
2019       CkPackMessage(&env);
2020       CmiSetHandler(env, recoverProcDataHandlerIdx);
2021       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2022       CpvAccess(procChkptBuf) = NULL;
2023       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2024 #endif
2025     }
2026
2027     // called on PE 0
2028     void qd_callback(void *m)
2029     {
2030       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2031       fflush(stdout);
2032       CkFreeMsg(m);
2033       if(CmiNumPartition()==1){
2034 #ifdef CMK_SMP
2035         for(int i=0;i<CmiMyNodeSize();i++){
2036           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2037           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2038           CmiSetHandler(msg, askProcDataHandlerIdx);
2039           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2040           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2041         }
2042         return;
2043 #endif
2044         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2045         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2046         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2047         CmiSetHandler(msg, askProcDataHandlerIdx);
2048         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2049         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2050       }
2051       else{
2052         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2053         CmiSetHandler(msg, recvPhaseHandlerIdx);
2054         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2055       }
2056     }
2057
2058     // on crashed node
2059     void CkMemRestart(const char *dummy, CkArgMsg *args)
2060     {
2061 #if CMK_MEM_CHECKPOINT
2062       _diePE = CmiMyNode();
2063       CpvAccess( _crashedNode )= CmiMyNode();
2064       CkMemCheckPT::inRestarting = 1;
2065       _discard_charm_message();
2066
2067       /*if(CmiMyRank()==0){
2068         CkCallback cb(qd_callback);
2069         CkStartQD(cb);
2070         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2071         }*/
2072       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2073       if(CmiNumPartition()==1){
2074         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2075         restartT = CmiWallTimer();
2076         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2077         fflush(stdout);
2078         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2079         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2080         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2081         CmiSetHandler(msg, askProcDataHandlerIdx);
2082         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2083         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2084       }
2085       else{
2086         CkCallback cb(qd_callback);
2087         CkStartQD(cb);
2088       }
2089 #else
2090       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2091 #endif
2092     }
2093
2094     // can be called in other files
2095     // return true if it is in restarting
2096     extern "C"
2097       int CkInRestarting()
2098       {
2099 #if CMK_MEM_CHECKPOINT
2100         if (CpvAccess( _crashedNode)!=-1) return 1;
2101         // gzheng
2102         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2103         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2104         return CkMemCheckPT::inRestarting;
2105 #else
2106         return 0;
2107 #endif
2108       }
2109
2110     extern "C"
2111       int CkReplicaAlive()
2112       {
2113         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2114         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2115         return CkMemCheckPT::replicaAlive;
2116
2117         /*if(CkMemCheckPT::replicaDead==1)
2118           return 0;
2119           else          
2120           return 1;*/
2121       }
2122
2123     extern "C"
2124       int CkInCheckpointing()
2125       {
2126         return CkMemCheckPT::inCheckpointing;
2127       }
2128
2129     extern "C"
2130       void CkSetInLdb(){
2131 #if CMK_MEM_CHECKPOINT
2132         CkMemCheckPT::inLoadbalancing = 1;
2133 #endif
2134       }
2135
2136     extern "C"
2137       int CkInLdb(){
2138 #if CMK_MEM_CHECKPOINT
2139         return CkMemCheckPT::inLoadbalancing;
2140 #endif
2141         return 0;
2142       }
2143
2144     extern "C"
2145       void CkResetInLdb(){
2146 #if CMK_MEM_CHECKPOINT
2147         CkMemCheckPT::inLoadbalancing = 0;
2148 #endif
2149       }
2150
2151     /*****************************************************************************
2152       module initialization
2153      *****************************************************************************/
2154
2155     static int arg_where = CkCheckPoint_inMEM;
2156
2157 #if CMK_MEM_CHECKPOINT
2158     void init_memcheckpt(char **argv)
2159     {
2160       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2161         arg_where = CkCheckPoint_inDISK;
2162       }
2163       CpvInitialize(int, use_checksum);
2164       CpvInitialize(int, resilience);
2165       CpvAccess(use_checksum)=0;
2166       CpvAccess(resilience)=0;
2167       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2168         CpvAccess(use_checksum)=1;
2169       }
2170       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2171         CpvAccess(resilience)=1;
2172       }
2173       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2174         CpvAccess(resilience)=2;
2175       }
2176       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2177         CpvAccess(resilience)=3;
2178       }
2179       // initiliazing _crashedNode variable
2180       CpvInitialize(int, _crashedNode);
2181       CpvInitialize(int, _remoteCrashedNode);
2182       CpvAccess(_crashedNode) = -1;
2183       CpvAccess(_remoteCrashedNode) = -1;
2184       init_FI(argv);
2185     }
2186 #endif
2187
2188     class CkMemCheckPTInit: public Chare {
2189       public:
2190         CkMemCheckPTInit(CkArgMsg *m) {
2191 #if CMK_MEM_CHECKPOINT
2192           if (arg_where == CkCheckPoint_inDISK) {
2193             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2194           }
2195           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2196           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2197 #endif
2198         }
2199     };
2200
2201     static void notifyHandler(char *msg)
2202     {
2203 #if CMK_MEM_CHECKPOINT
2204       CmiFree(msg);
2205       /* immediately increase restart phase to filter old messages */
2206       CpvAccess(_curRestartPhase) ++;
2207       CpvAccess(_qd)->flushStates();
2208       _discard_charm_message();
2209
2210 #endif
2211     }
2212
2213     extern "C"
2214       void notify_crash(int node)
2215       {
2216 #ifdef CMK_MEM_CHECKPOINT
2217         CpvAccess( _crashedNode) = node;
2218 #ifdef CMK_SMP
2219         for(int i=0;i<CkMyNodeSize();i++){
2220           CpvAccessOther(_crashedNode,i)=node;
2221         }
2222 #endif
2223         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2224         CkMemCheckPT::inRestarting = 1;
2225
2226         // this may be in interrupt handler, send a message to reset QD
2227         int pe = CmiNodeFirst(CkMyNode());
2228         for(int i=0;i<CkMyNodeSize();i++){
2229           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2230           CmiSetHandler(msg, notifyHandlerIdx);
2231           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2232         }
2233 #endif
2234       }
2235
2236     extern "C" void (*notify_crash_fn)(int node);
2237
2238
2239 #if CMK_CONVERSE_MPI
2240     void buddyDieHandler(char *msg)
2241     {
2242 #if CMK_MEM_CHECKPOINT
2243       // notify
2244       CkMemCheckPT::inRestarting = 1;
2245       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2246       notify_crash(diepe);
2247       // send message to crash pe to let it restart
2248       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2249       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2250       int buddy = obj->BuddyPE(CmiMyPe());
2251       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2252       if (buddy == diepe)  {
2253         mpi_restart_crashed(diepe, newrank);
2254       }
2255 #endif
2256     }
2257
2258     void pingHandler(void *msg)
2259     {
2260       lastPingTime = CmiWallTimer();
2261       CmiFree(msg);
2262     }
2263
2264     void pingCheckHandler()
2265     {
2266 #if CMK_MEM_CHECKPOINT
2267       double now = CmiWallTimer();
2268       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2269         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2270         int i, pe, buddy;
2271         // tell everyone the buddy dies
2272         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2273         for (i = 1; i < CmiNumPes(); i++) {
2274           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2275           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2276         }
2277         buddy = pe;
2278         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2279         fflush(stdout);
2280         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2281         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2282         CmiSetHandler(msg, buddyDieHandlerIdx);
2283         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2284         //send to everyone in the other world
2285         if(CmiNumPartition()!=1){
2286           //for(int i=0;i<CmiNumPes();i++){
2287             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2288             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2289             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2290             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2291           //}
2292         }
2293       }
2294         else 
2295           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2296 #endif
2297       }
2298
2299       void pingBuddy()
2300       {
2301 #if CMK_MEM_CHECKPOINT
2302         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2303         if (obj) {
2304           int buddy = obj->BuddyPE(CkMyPe());
2305           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2306           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2307           CmiSetHandler(msg, pingHandlerIdx);
2308           CmiGetRestartPhase(msg) = 9999;
2309           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2310         }
2311         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2312 #endif
2313       }
2314 #endif
2315
2316       // initproc
2317       void CkRegisterRestartHandler( )
2318       {
2319 #if CMK_MEM_CHECKPOINT
2320         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2321         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2322         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2323         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2324         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2325         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2326
2327         //for replica
2328         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2329         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2330         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2331         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2332         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2333         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2334         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2335         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2336         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2337         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2338
2339 #if CMK_CONVERSE_MPI
2340         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2341         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2342         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2343 #endif
2344
2345         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2346         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2347         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2348         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2349         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2350         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2351         CpvInitialize(int,curPointer);
2352         CpvInitialize(int,recvdLocal);
2353         CpvInitialize(int,localChkpDone);
2354         CpvInitialize(int,remoteChkpDone);
2355         CpvInitialize(int,remoteStarted);
2356         CpvInitialize(int,localStarted);
2357         CpvInitialize(int,localReady);
2358         CpvInitialize(int,remoteReady);
2359         CpvInitialize(int,recvdRemote);
2360         CpvInitialize(int,recvdProcChkp);
2361         CpvInitialize(int,localChecksum);
2362         CpvInitialize(int,remoteChecksum);
2363         CpvInitialize(int,recvdArrayChkp);
2364
2365         CpvAccess(procChkptBuf) = NULL;
2366         CpvAccess(buddyBuf) = NULL;
2367         CpvAccess(recoverProcBuf) = NULL;
2368         CpvAccess(recoverArrayBuf) = NULL;
2369         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2370         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2371         CpvAccess(chkpBuf)[0] = NULL;
2372         CpvAccess(chkpBuf)[1] = NULL;
2373         CpvAccess(localProcChkpBuf)[0] = NULL;
2374         CpvAccess(localProcChkpBuf)[1] = NULL;
2375
2376         CpvAccess(curPointer) = 0;
2377         CpvAccess(recvdLocal) = 0;
2378         CpvAccess(localChkpDone) = 0;
2379         CpvAccess(remoteChkpDone) = 0;
2380         CpvAccess(remoteStarted) = 0;
2381         CpvAccess(localStarted) = 0;
2382         CpvAccess(localReady) = 0;
2383         CpvAccess(remoteReady) = 0;
2384         CpvAccess(recvdRemote) = 0;
2385         CpvAccess(recvdProcChkp) = 0;
2386         CpvAccess(localChecksum) = 0;
2387         CpvAccess(remoteChecksum) = 0;
2388         CpvAccess(recvdArrayChkp) = 0;
2389
2390         notify_crash_fn = notify_crash;
2391
2392 #if ! CMK_CONVERSE_MPI
2393         // print pid to kill
2394         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2395         //  sleep(4);
2396 #endif
2397 #endif
2398       }
2399
2400
2401       extern "C"
2402         int CkHasCheckpoints()
2403         {
2404           return checkpointed;
2405         }
2406
2407       /// @todo: the following definitions should be moved to a separate file containing
2408       // structures and functions about fault tolerance strategies
2409
2410       /**
2411        *  * @brief: function for killing a process                                             
2412        *   */
2413 #ifdef CMK_MEM_CHECKPOINT
2414 #if CMK_HAS_GETPID
2415       void killLocal(void *_dummy,double curWallTime){
2416         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2417         if(CmiWallTimer()<killTime-1){
2418           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2419         }else{ 
2420 #if CMK_CONVERSE_MPI
2421           CkDieNow();
2422 #else 
2423           kill(getpid(),SIGKILL);                                               
2424 #endif
2425         }              
2426       } 
2427 #else
2428       void killLocal(void *_dummy,double curWallTime){
2429         CmiAbort("kill() not supported!");
2430       }
2431 #endif
2432 #endif
2433
2434 #ifdef CMK_MEM_CHECKPOINT
2435       /**
2436        * @brief: reads the file with the kill information
2437        */
2438       void readKillFile(){
2439         FILE *fp=fopen(killFile,"r");
2440         if(!fp){
2441           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2442           return;
2443         }
2444         int proc;
2445         double sec;
2446         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2447           if(proc == CkMyNode() && CkMyRank() == 0){
2448             killTime = CmiWallTimer()+sec;
2449             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2450             CcdCallFnAfter(killLocal,NULL,sec*1000);
2451           }
2452         }
2453         fclose(fp);
2454       }
2455
2456 #if ! CMK_CONVERSE_MPI
2457       void CkDieNow()
2458       {
2459 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2460         // ignored for non-mpi version
2461         CmiPrintf("[%d] die now.\n", CmiMyPe());
2462         killTime = CmiWallTimer()+0.001;
2463         CcdCallFnAfter(killLocal,NULL,1);
2464 #endif
2465       }
2466 #endif
2467
2468 #endif
2469
2470 #include "CkMemCheckpoint.def.h"
2471
2472