d678343ce6182a954c262637fe060403f31b37f3
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   if(CkMyPe()==0){
440     CkPrintf("local max iter is %d\n",localMaxIter);
441   }
442   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
443   int elemCount = CkCountChkpSyncElements();
444   if(CkMyPe()==0)
445     startTime = CmiWallTimer();
446   if(elemCount == 0){
447     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448   }
449 }
450
451 //when one replica failes, decide the next chkp iter
452
453 void CkMemCheckPT::recvIter(int iter){
454   if(maxIter<iter){
455     maxIter=iter;
456   }
457 }
458
459 void CkMemCheckPT::recvMaxIter(int iter){
460   localDecided = false;
461   if(CkMyPe()==0)
462     CkPrintf("checkpoint iteration is %d\n",iter);
463   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
464 }
465
466 void CkMemCheckPT::reachChkpIter(){
467   recvIterCount++;
468   elemCount = CkCountChkpSyncElements();
469   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
470   if(recvIterCount == elemCount){
471     recvIterCount = 0;
472     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
473   }
474 }
475
476 void CkMemCheckPT::startChkp(){
477   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
478   CkStartMemCheckpoint(cpCallback);
479 }
480
481 // called by checkpoint mgr to restore an array element
482 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
483 {
484 #if CMK_MEM_CHECKPOINT
485   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
486   // m->index.print();
487   PUP::fromMem p(m->packData);
488   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
489   CmiAssert(mgr);
490 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
491   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
492 #else
493   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
494 #endif
495
496   // find a list of array elements bound together
497   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
498   CmiAssert(elt);
499   CkLocRec_local *rec = elt->myRec;
500   CkVec<CkMigratable *> list;
501   mgr->migratableList(rec, list);
502   CmiAssert(list.length() > 0);
503   for (int l=0; l<list.length(); l++) {
504     elt = (ArrayElement *)list[l];
505     elt->budPEs[0] = m->bud1;
506     elt->budPEs[1] = m->bud2;
507     //    reset, may not needed now
508     // for now.
509     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
510       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
511       if (c) c->redNo = 0;
512     }
513   }
514 #endif
515 }
516
517 // return 1 if pe is a crashed processor
518 int CkMemCheckPT::isFailed(int pe)
519 {
520   for (int i=0; i<failedPes.length(); i++)
521     if (failedPes[i] == pe) return 1;
522   return 0;
523 }
524
525 // add pe into history list of all failed processors
526 void CkMemCheckPT::failed(int pe)
527 {
528   if (isFailed(pe)) return;
529   failedPes.push_back(pe);
530 }
531
532 int CkMemCheckPT::totalFailed()
533 {
534   return failedPes.length();
535 }
536
537 // create an checkpoint entry for array element of aid with index.
538 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
539 {
540   // error check, no duplicate
541   int idx, len = ckTable.size();
542   for (idx=0; idx<len; idx++) {
543     CkCheckPTInfo *entry = ckTable[idx];
544     if (index == entry->index) {
545       if (loc == entry->locMgr) {
546         // bindTo array elements
547         return;
548       }
549       // for array inheritance, the following check may fail
550       // because ArrayElement constructor of all superclasses are called
551       if (aid == entry->aid) {
552         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
553         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
554       }
555     }
556   }
557   CkCheckPTInfo *newEntry;
558   if (where == CkCheckPoint_inMEM)
559     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
560   else
561     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
562   ckTable.push_back(newEntry);
563   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
564 }
565
566 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
567 {
568 #if !CMK_CHKP_ALL       
569   int buddy = msg->bud1;
570   if (buddy == CkMyPe()) buddy = msg->bud2;
571   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
572   recvData(msg);
573   // ack
574   thisProxy[buddy].gotData();
575 #else
576   chkpTable[0] = NULL;
577   chkpTable[1] = NULL;
578   recvArrayCheckpoint(msg);
579   thisProxy[msg->bud2].gotData();
580 #endif
581 }
582
583 // loop through my checkpoint table and ask checkpointed array elements
584 // to send me checkpoint data.
585 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
586 void CkMemCheckPT::doItNow(int starter)
587 {
588   checkpointed = 1;
589   //cpCallback = cb;
590   cpStarter = starter;
591   inCheckpointing = 1;
592   if (CkMyPe() == cpStarter) {
593     startTime = CmiWallTimer();
594     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
595   }
596 #if CMK_CONVERSE_MPI
597   if(CmiNumPartition()==1)
598 #endif    
599   {
600
601 #if !CMK_CHKP_ALL
602     int len = ckTable.length();
603     for (int i=0; i<len; i++) {
604       CkCheckPTInfo *entry = ckTable[i];
605       // always let the bigger number processor send request
606       //if (CkMyPe() < entry->pNo) continue;
607       // always let the smaller number processor send request, may on same proc
608       if (!isMaster(entry->pNo)) continue;
609       // call inmem_checkpoint to the array element, ask it to send
610       // back checkpoint data via recvData().
611       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
612       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
613     }
614     // if my table is empty, then I am done
615     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
616 #else
617     startArrayCheckpoint();
618 #endif
619     sendProcData();
620   }
621 #if CMK_CONVERSE_MPI
622   else
623   {
624     startCheckpoint();
625   }
626 #endif
627   // pack and send proc level data
628 }
629
630 class MemElementPacker : public CkLocIterator{
631   private:
632     //    CkLocMgr *locMgr;
633     PUP::er &p;
634     std::map<CkHashCode,CkLocation> arrayMap;
635   public:
636     MemElementPacker(PUP::er &p_):p(p_){};
637     void addLocation(CkLocation &loc){
638       CkArrayIndexMax idx = loc.getIndex();
639       arrayMap[idx.hash()] = loc;
640     }
641     void writeCheckpoint(){
642       std::map<CkHashCode, CkLocation>::iterator it;
643       for(it = arrayMap.begin();it!=arrayMap.end();it++){
644         CkLocation loc = it->second;
645         CkLocMgr *locMgr = loc.getManager();
646         CkArrayIndexMax idx = loc.getIndex();
647         CkGroupID gID = locMgr->ckGetGroupID();
648         p|gID;
649         p|idx;
650         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
651       }
652     }
653 };
654
655 void pupAllElements(PUP::er &p){
656 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
657   int numElements;
658   if(!p.isUnpacking()){
659     numElements = CkCountArrayElements();
660   }
661   p | numElements;
662   if(!p.isUnpacking()){
663     MemElementPacker packer(p);
664     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
665     packer.writeCheckpoint();
666   }
667 #endif
668 }
669
670 void CkMemCheckPT::startArrayCheckpoint(){
671 #if CMK_CHKP_ALL
672   int size;
673   {
674     PUP::sizer psizer;
675     pupAllElements(psizer);
676     size = psizer.size();
677   }
678   int packSize = size/sizeof(double)+1;
679   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
680   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
681   msg->len = size;
682   msg->cp_flag = 1;
683   int budPEs[2];
684   msg->bud1=CkMyPe();
685   msg->bud2=ChkptOnPe(CkMyPe());
686   {
687     PUP::toMem p(msg->packData);
688     pupAllElements(p);
689   }
690   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
691   if(chkpTable[0]) delete chkpTable[0];
692   chkpTable[0] = msg;
693   //send the checkpoint to my 
694   recvCount++;
695 #endif
696 }
697
698 void CkMemCheckPT::startCheckpoint(){
699 #if CMK_CONVERSE_MPI
700   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
701     CkPrintf("in start checkpointing!!!!\n");
702   int size;
703   {
704     PUP::sizer p;
705     _handleProcData(p,CmiFalse);
706     size = p.size();
707   }
708   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
709   procMsg->len = size;
710   procMsg->cp_flag = 1;
711   {
712     PUP::toMem p(procMsg->packData);
713     _handleProcData(p,CmiFalse);
714   }
715   int pointer = CpvAccess(curPointer);
716   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
717   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
718
719   {
720     PUP::sizer psizer;
721     pupAllElements(psizer);
722     size = psizer.size();
723   }
724   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
725   msg->len = size;
726   msg->cp_flag = 1;
727   int checksum;
728   {
729     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
730       PUP::checker p(msg->packData);
731       pupAllElements(p);
732       checksum = p.getChecksum();
733     }else{  
734 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
735       PUP::toMem p(msg->packData);
736       pupAllElements(p);
737     }
738   }
739   pointer = CpvAccess(curPointer);
740   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
741   CpvAccess(chkpBuf)[pointer] = msg;
742   if(CkMyPe()==0)
743     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
744   if(CkReplicaAlive()==1){
745     CpvAccess(recvdLocal) = 1;
746     if(CpvAccess(use_checksum)){
747       CpvAccess(localChecksum) = checksum;
748       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
749       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
750       //only one reaplica will send
751       if(CmiMyPartition()==0){
752         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
753         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
754       }
755     }else{
756       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
757       CkPackMessage(&env);
758       if(CmiMyPartition()==0){
759         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
760         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
761       }
762     }
763     if(CmiMyPartition()==0){
764       notifyReplica = 1;
765       thisProxy[CkMyPe()].doneComparison(true);
766     }
767   }
768   if(CpvAccess(recvdRemote)==1){
769     //only partition 1 will do it
770     //compare the checkpoint 
771     int size = CpvAccess(chkpBuf)[pointer]->len;
772     if(CpvAccess(use_checksum)){
773       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
774         thisProxy[CkMyPe()].doneComparison(true);
775       }
776       else{
777         thisProxy[CkMyPe()].doneComparison(false);
778       }
779     }else{
780       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
781         thisProxy[CkMyPe()].doneComparison(true);
782       }
783       else{
784         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
785         thisProxy[CkMyPe()].doneComparison(false);
786       }
787     }
788     if(CkMyPe()==0)
789       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
790   }
791   else{
792     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
793       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
794       if(CpvAccess(remoteReady)==1){
795         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
796         {       
797           int pointer = CpvAccess(curPointer);
798           //send the proc data
799           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
800           procMsg->pointer = pointer;
801           envelope * env = (envelope *)(UsrToEnv(procMsg));
802           CkPackMessage(&env);
803           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
804           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
805           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
806             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
807           }
808         }
809         //send the array checkpoint data
810         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
811         msg->pointer = CpvAccess(curPointer);
812         envelope * env = (envelope *)(UsrToEnv(msg));
813         CkPackMessage(&env);
814         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
815         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
816         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
817           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
818       }else{
819         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
820           int pointer = CpvAccess(curPointer);
821           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
822           (CpvAccess(recoverProcBuf))->pointer = pointer;
823         }
824         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
825         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
826         CpvAccess(localReady) = 1;
827       }
828       //can continue work, no need to wait for my replica
829       int _ret = 1;
830       notifyReplica = 1;
831       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
832       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
833     }
834   }
835 #endif
836 }
837
838 void CkMemCheckPT::doneComparison(bool ret){
839   int _ret = 1;
840   if(!ret){
841     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
842     _ret = 0;
843   }else{
844     _ret = 1;
845   }
846   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
847   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
848 }
849
850 void CkMemCheckPT::doneRComparison(int ret){
851   //    if(CpvAccess(curPointer) == 0){
852   //if(ret==CkNumPes()){
853     CpvAccess(localChkpDone) = 1;
854     if(CpvAccess(remoteChkpDone) ==1){
855       thisProxy.doneBothComparison();
856     }else{
857       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
858     }
859   //}
860   /*else{
861     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
862     startTime = CmiWallTimer();
863     thisProxy.RollBack();
864   }*/
865   if(notifyReplica == 0){
866     //notify the replica am done
867     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
868     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
869     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
870     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
871     notifyReplica = 1;
872   }
873 }
874
875 void CkMemCheckPT::doneBothComparison(){
876   CpvAccess(recvdRemote) = 0;
877   CpvAccess(remoteReady) = 0;
878   CpvAccess(localReady) = 0;
879   CpvAccess(recvdLocal) = 0;
880   CpvAccess(localChkpDone) = 0;
881   CpvAccess(remoteChkpDone) = 0;
882   CpvAccess(remoteStarted) = 0;
883   CpvAccess(localStarted) = 0;
884   CpvAccess(_remoteCrashedNode) = -1;
885   CkMemCheckPT::replicaAlive = 1;
886   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
887   CpvAccess(curPointer)^=1;
888   inCheckpointing = 0;
889   notifyReplica = 0;
890   if(CkMyPe() == 0){
891     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
892   }
893   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
894 }
895
896 void CkMemCheckPT::RollBack(){
897   //restore group data
898   checkpointed = 0;
899   inRestarting = 1;
900   int pointer = CpvAccess(curPointer)^1;//use the previous one
901   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
902   PUP::fromMem p(chkpMsg->packData);    
903
904   //destroy array elements
905   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
906   int numGroups = CkpvAccess(_groupIDTable)->size();
907   for(int i=0;i<numGroups;i++) {
908     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
909     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
910     obj->flushStates();
911     obj->ckJustMigrated();
912   }
913   //restore array elements
914
915   int numElements;
916   p|numElements;
917
918   if(p.isUnpacking()){
919     for(int i=0;i<numElements;i++){
920       CkGroupID gID;
921       CkArrayIndex idx;
922       p|gID;
923       p|idx;
924       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
925       CmiAssert(mgr);
926       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
927     }
928     }
929     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
930     contribute(cb);
931   }
932
933   void CkMemCheckPT::notifyReplicaDie(int pe){
934     replicaAlive = 0;
935     CpvAccess(_remoteCrashedNode) = pe;
936   }
937
938   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
939   {
940 #if CMK_CHKP_ALL
941     int idx = 1;
942     if(msg->bud1 == CkMyPe()){
943       idx = 0;
944     }
945     int isChkpting = msg->cp_flag;
946     if(isChkpting == 1){
947       if(chkpTable[idx]) delete chkpTable[idx];
948     }
949     chkpTable[idx] = msg;
950     if(isChkpting){
951       recvCount++;
952       if(recvCount == 2){
953         if (where == CkCheckPoint_inMEM) {
954           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
955         }
956         else if (where == CkCheckPoint_inDISK) {
957           // another barrier for finalize the writing using fsync
958           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
959           contribute(0,NULL,CkReduction::sum_int,localcb);
960         }
961         else
962           CmiAbort("Unknown checkpoint scheme");
963         recvCount = 0;
964       }
965     }
966 #endif
967   }
968
969   // don't handle array elements
970   static inline void _handleProcData(PUP::er &p, CmiBool create)
971   {
972     // save readonlys, and callback BTW
973     CkPupROData(p);
974
975     // save mainchares 
976     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
977
978 #ifndef CMK_CHARE_USE_PTR
979     // save non-migratable chare
980     CkPupChareData(p);
981 #endif
982
983     // save groups into Groups.dat
984     CkPupGroupData(p,create);
985
986     // save nodegroups into NodeGroups.dat
987     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
988   }
989
990   void CkMemCheckPT::sendProcData()
991   {
992     // find out size of buffer
993     int size;
994     {
995       PUP::sizer p;
996       _handleProcData(p,CmiTrue);
997       size = p.size();
998     }
999     int packSize = size;
1000     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1001     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1002     {
1003       PUP::toMem p(msg->packData);
1004       _handleProcData(p,CmiTrue);
1005     }
1006     msg->pe = CkMyPe();
1007     msg->len = size;
1008     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1009     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1010   }
1011
1012   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1013   {
1014     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1015     CpvAccess(procChkptBuf) = msg;
1016     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1017     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1018   }
1019
1020   // ArrayElement call this function to give us the checkpointed data
1021   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1022   {
1023     int len = ckTable.length();
1024     int idx;
1025     for (idx=0; idx<len; idx++) {
1026       CkCheckPTInfo *entry = ckTable[idx];
1027       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1028     }
1029     CkAssert(idx < len);
1030     int isChkpting = msg->cp_flag;
1031     ckTable[idx]->updateBuffer(msg);
1032     if (isChkpting) {
1033       // all my array elements have returned their inmem data
1034       // inform starter processor that I am done.
1035       recvCount ++;
1036       if (recvCount == ckTable.length()) {
1037         if (where == CkCheckPoint_inMEM) {
1038           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1039         }
1040         else if (where == CkCheckPoint_inDISK) {
1041           // another barrier for finalize the writing using fsync
1042           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1043           contribute(0,NULL,CkReduction::sum_int,localcb);
1044         }
1045         else
1046           CmiAbort("Unknown checkpoint scheme");
1047         recvCount = 0;
1048       } 
1049     }
1050   }
1051
1052   // only used in disk checkpointing
1053   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1054   {
1055     delete m;
1056 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1057     system("sync");
1058 #endif
1059     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1060   }
1061
1062   // only is called on cpStarter when checkpoint is done
1063   void CkMemCheckPT::cpFinish()
1064   {
1065     CmiAssert(CkMyPe() == cpStarter);
1066     peCount++;
1067     // now that all processors have finished, activate callback
1068     if (peCount == 2) 
1069     {
1070       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1071       peCount = 0;
1072       thisProxy.report();
1073     }
1074   }
1075
1076   // for debugging, report checkpoint info
1077   void CkMemCheckPT::report()
1078   {
1079     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1080     inCheckpointing = 0;
1081 #if !CMK_CHKP_ALL
1082     int objsize = 0;
1083     int len = ckTable.length();
1084     for (int i=0; i<len; i++) {
1085       CkCheckPTInfo *entry = ckTable[i];
1086       CmiAssert(entry);
1087       objsize += entry->getSize();
1088     }
1089     CmiAssert(CpvAccess(procChkptBuf));
1090     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1091 #else
1092     if(CkMyPe()==0)
1093       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1094 #endif
1095   }
1096
1097   /*****************************************************************************
1098     RESTART Procedure
1099    *****************************************************************************/
1100
1101   // master processor of two buddies
1102   inline int CkMemCheckPT::isMaster(int buddype)
1103   {
1104 #if 0
1105     int mype = CkMyPe();
1106     //CkPrintf("ismaster: %d %d\n", pe, mype);
1107     if (CkNumPes() - totalFailed() == 2) {
1108       return mype > buddype;
1109     }
1110     for (int i=1; i<CkNumPes(); i++) {
1111       int me = (buddype+i)%CkNumPes();
1112       if (isFailed(me)) continue;
1113       if (me == mype) return 1;
1114       else return 0;
1115     }
1116     return 0;
1117 #else
1118     // smaller one
1119     int mype = CkMyPe();
1120     //CkPrintf("ismaster: %d %d\n", pe, mype);
1121     if (CkNumPes() - totalFailed() == 2) {
1122       return mype < buddype;
1123     }
1124 #if NODE_CHECKPOINT
1125     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1126     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1127 #else
1128       for (int i=1; i<CkNumPes(); i++) {
1129 #endif
1130         int me = (mype+i)%CkNumPes();
1131         if (isFailed(me)) continue;
1132         if (me == buddype) return 1;
1133         else return 0;
1134       }
1135       return 0;
1136 #endif
1137     }
1138
1139
1140
1141 #if 0
1142     // helper class to pup all elements that belong to same ckLocMgr
1143     class ElementDestoryer : public CkLocIterator {
1144       private:
1145         CkLocMgr *locMgr;
1146       public:
1147         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1148         void addLocation(CkLocation &loc) {
1149           CkArrayIndex idx=loc.getIndex();
1150           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1151           loc.destroy();
1152         }
1153     };
1154 #endif
1155
1156     // restore the bitmap vector for LB
1157     void CkMemCheckPT::resetLB(int diepe)
1158     {
1159 #if CMK_LBDB_ON
1160       int i;
1161       char *bitmap = new char[CkNumPes()];
1162       // set processor available bitmap
1163       get_avail_vector(bitmap);
1164       for (i=0; i<failedPes.length(); i++)
1165         bitmap[failedPes[i]] = 0; 
1166       bitmap[diepe] = 0;
1167
1168 #if CK_NO_PROC_POOL
1169       set_avail_vector(bitmap);
1170 #endif
1171
1172       // if I am the crashed pe, rebuild my failedPEs array
1173       if (CkMyNode() == diepe)
1174         for (i=0; i<CkNumPes(); i++) 
1175           if (bitmap[i]==0) failed(i);
1176
1177       delete [] bitmap;
1178 #endif
1179     }
1180
1181     static double restartT;
1182
1183     // in case when failedPe dies, everybody go through its checkpoint table:
1184     // destory all array elements
1185     // recover lost buddies
1186     // reconstruct all array elements from check point data
1187     // called on all processors
1188     void CkMemCheckPT::restart(int diePe)
1189     {
1190 #if CMK_MEM_CHECKPOINT
1191       thisFailedPe = diePe;
1192       double curTime = CmiWallTimer();
1193       if (CkMyPe() == thisFailedPe){
1194         restartT = CmiWallTimer();
1195         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1196       }
1197       stage = (char*)"resetLB";
1198       startTime = curTime;
1199       if (CkMyPe() == thisFailedPe)
1200         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1201
1202 //#if CK_NO_PROC_POOL
1203 //      failed(diePe);  // add into the list of failed pes
1204 //#endif
1205
1206 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1207
1208       inRestarting = 1;
1209
1210       // disable load balancer's barrier
1211       //if (CkMyPe() != diePe) resetLB(diePe);
1212
1213       CKLOCMGR_LOOP(mgr->startInserting(););
1214
1215
1216       if(CmiNumPartition()==1){
1217         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1218       }else{
1219         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1220         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1221       }
1222 #endif
1223     }
1224
1225     // loally remove all array elements
1226     void CkMemCheckPT::removeArrayElements()
1227     {
1228 #if CMK_MEM_CHECKPOINT
1229       int len = ckTable.length();
1230       double curTime = CmiWallTimer();
1231       if (CkMyPe() == thisFailedPe) 
1232         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1233       stage = (char*)"removeArrayElements";
1234       startTime = curTime;
1235
1236       //  if (cpCallback.isInvalid()) 
1237       //          CkPrintf("invalid pe %d\n",CkMyPe());
1238       //          CkAbort("Didn't set restart callback\n");;
1239       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1240
1241       // get rid of all buffering and remote recs
1242       // including destorying all array elements
1243 #if CK_NO_PROC_POOL  
1244       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1245 #else
1246       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1247 #endif
1248       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1249 #endif
1250     }
1251
1252     // flush state in reduction manager
1253     void CkMemCheckPT::resetReductionMgr()
1254     {
1255       if (CkMyPe() == thisFailedPe) 
1256         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1257       stage = (char *)"resetReductionMgr";
1258       int numGroups = CkpvAccess(_groupIDTable)->size();
1259       for(int i=0;i<numGroups;i++) {
1260         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1261         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1262         obj->flushStates();
1263         obj->ckJustMigrated();
1264       }
1265
1266       //reset CmiReduce
1267       CmiResetReductions();
1268
1269       // reset again
1270       //CpvAccess(_qd)->flushStates();
1271       if(CmiNumPartition()==1){
1272         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1273       }
1274       else{
1275         if (CkMyPe() == thisFailedPe) 
1276           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1277         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1278       }
1279     }
1280
1281     // recover the lost buddies
1282     void CkMemCheckPT::recoverBuddies()
1283     {
1284       int idx;
1285       int len = ckTable.length();
1286       // ready to flush reduction manager
1287       // cannot be CkMemCheckPT::restart because destory will modify states
1288       double curTime = CmiWallTimer();
1289       if (CkMyPe() == thisFailedPe)
1290         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1291       stage = (char *)"recoverBuddies";
1292       if (CkMyPe() == thisFailedPe)
1293         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1294       startTime = curTime;
1295
1296       // recover buddies
1297       expectCount = 0;
1298 #if !CMK_CHKP_ALL
1299       for (idx=0; idx<len; idx++) {
1300         CkCheckPTInfo *entry = ckTable[idx];
1301         if (entry->pNo == thisFailedPe) {
1302 #if CK_NO_PROC_POOL
1303           // find a new buddy
1304           int budPe = BuddyPE(CkMyPe());
1305 #else
1306           int budPe = thisFailedPe;
1307 #endif
1308           CkArrayCheckPTMessage *msg = entry->getCopy();
1309           msg->bud1 = budPe;
1310           msg->bud2 = CkMyPe();
1311           msg->cp_flag = 0;            // not checkpointing
1312           thisProxy[budPe].recoverEntry(msg);
1313           expectCount ++;
1314         }
1315       }
1316 #else
1317       //send to failed pe
1318       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1319 #if CK_NO_PROC_POOL
1320         // find a new buddy
1321         int budPe = BuddyPE(CkMyPe());
1322 #else
1323         int budPe = thisFailedPe;
1324 #endif
1325         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1326         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1327         msg->cp_flag = 0;            // not checkpointing
1328         msg->bud1 = budPe;
1329         msg->bud2 = CkMyPe();
1330         thisProxy[budPe].recoverEntry(msg);
1331         expectCount ++;
1332       }
1333 #endif
1334
1335       if (expectCount == 0) {
1336         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1337       }
1338       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1339     }
1340
1341     void CkMemCheckPT::gotData()
1342     {
1343       ackCount ++;
1344       if (ackCount == expectCount) {
1345         ackCount = 0;
1346         expectCount = -1;
1347         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1348         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1349       }
1350     }
1351
1352     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1353     {
1354
1355       for (int i=0; i<n; i++) {
1356         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1357         mgr->updateLocation(idx[i], nowOnPe);
1358       }
1359       thisProxy[nowOnPe].gotReply();
1360     }
1361
1362     // restore array elements
1363     void CkMemCheckPT::recoverArrayElements()
1364     {
1365       double curTime = CmiWallTimer();
1366       int len = ckTable.length();
1367       CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1368       stage = (char *)"recoverArrayElements";
1369       startTime = curTime;
1370       int flag = 0;
1371       // recover all array elements
1372       int count = 0;
1373
1374 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1375       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1376       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1377 #endif
1378
1379 #if !CMK_CHKP_ALL
1380       for (int idx=0; idx<len; idx++)
1381       {
1382         CkCheckPTInfo *entry = ckTable[idx];
1383 #if CK_NO_PROC_POOL
1384         // the bigger one will do 
1385         //    if (CkMyPe() < entry->pNo) continue;
1386         if (!isMaster(entry->pNo)) continue;
1387 #else
1388         // smaller one do it, which has the original object
1389         if (CkMyPe() == entry->pNo+1 || 
1390             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1391 #endif
1392         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1393
1394         entry->updateBuddy(CkMyPe(), entry->pNo);
1395         CkArrayCheckPTMessage *msg = entry->getCopy();
1396         // gzheng
1397         //thisProxy[CkMyPe()].inmem_restore(msg);
1398         inmem_restore(msg);
1399 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1400         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1401         int homePe = mgr->homePe(msg->index);
1402         if (homePe != CkMyPe()) {
1403           gmap[homePe].push_back(msg->locMgr);
1404           imap[homePe].push_back(msg->index);
1405         }
1406 #endif
1407         CkFreeMsg(msg);
1408         count ++;
1409       }
1410 #else
1411       char * packData;
1412       if(CmiNumPartition()==1){
1413         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1414         packData = (char *)msg->packData;
1415       }
1416       else{
1417         int pointer = CpvAccess(curPointer);
1418         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1419         packData = msg->packData;
1420       }
1421 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1422       recoverAll(packData,gmap,imap);
1423 #else
1424       recoverAll(packData);
1425 #endif
1426 #endif
1427 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1428       for (int i=0; i<CkNumPes(); i++) {
1429         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1430           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1431           flag++;       
1432         }
1433       }
1434       delete [] imap;
1435       delete [] gmap;
1436 #endif
1437       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1438       CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1439       //if (CkMyPe() == thisFailedPe)
1440
1441       CKLOCMGR_LOOP(mgr->doneInserting(););
1442
1443       // _crashedNode = -1;
1444       CpvAccess(_crashedNode) = -1;
1445 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1446       if (CkMyPe() == 0)
1447         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1448 #else
1449       if(flag == 0)
1450       {
1451         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1452       }
1453 #endif
1454     }
1455
1456     void CkMemCheckPT::gotReply(){
1457       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1458     }
1459
1460     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1461 #if CMK_CHKP_ALL
1462       PUP::fromMem p(packData);
1463       int numElements;
1464       p|numElements;
1465       if(p.isUnpacking()){
1466         for(int i=0;i<numElements;i++){
1467           CkGroupID gID;
1468           CkArrayIndex idx;
1469           p|gID;
1470           p|idx;
1471           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1472           int homePe = mgr->homePe(idx);
1473 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1474           mgr->resume(idx,p,CmiTrue,CmiTrue);
1475 #else
1476           if(CmiNumPartition()==1)
1477             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1478           else{
1479             if(CkMyPe()==thisFailedPe){
1480               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1481             }
1482             else{
1483               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1484             }
1485           }
1486 #endif
1487 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1488           homePe = mgr->homePe(idx);
1489           if (homePe != CkMyPe()) {
1490             gmap[homePe].push_back(gID);
1491             imap[homePe].push_back(idx);
1492           }
1493 #endif
1494         }
1495       }
1496 #endif
1497     }
1498
1499
1500     // on every processor
1501     // turn load balancer back on
1502     void CkMemCheckPT::finishUp()
1503     {
1504       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1505       //CKLOCMGR_LOOP(mgr->doneInserting(););
1506
1507       if (CkMyPe() == thisFailedPe)
1508       {
1509         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1510         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1511         fflush(stdout);
1512       }
1513       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1514       inRestarting = 0;
1515       maxIter = -1;
1516 #if CMK_CONVERSE_MPI    
1517       if(CmiNumPartition()!=1){
1518         CpvAccess(recvdProcChkp) = 0;
1519         CpvAccess(recvdArrayChkp) = 0;
1520         CpvAccess(curPointer)^=1;
1521         //notify my replica, restart is done
1522         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1523           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1524           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1525           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1526         }
1527       }
1528       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1529         lastPingTime = CmiWallTimer();
1530         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1531       }
1532 #endif
1533
1534 #if CK_NO_PROC_POOL
1535 #if NODE_CHECKPOINT
1536       int numnodes = CmiNumPhysicalNodes();
1537 #else
1538       int numnodes = CkNumPes();
1539 #endif
1540       if (numnodes-totalFailed() <=2) {
1541         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1542         _memChkptOn = 0;
1543       }
1544 #endif
1545     }
1546
1547     void CkMemCheckPT::recoverFromSoftFailure()
1548     {
1549       inRestarting = 0;
1550       maxIter = -1;
1551       CpvAccess(recvdRemote) = 0;
1552       CpvAccess(recvdLocal) = 0;
1553       CpvAccess(localChkpDone) = 0;
1554       CpvAccess(remoteChkpDone) = 0;
1555       CpvAccess(remoteReady) = 0;
1556       CpvAccess(localReady) = 0;
1557       inCheckpointing = 0;
1558       notifyReplica = 0;
1559       CpvAccess(remoteStarted) = 0;
1560       CpvAccess(localStarted) = 0;
1561       CpvAccess(_remoteCrashedNode) = -1;
1562       CkMemCheckPT::replicaAlive = 1;
1563       inCheckpointing = 0;
1564       notifyReplica = 0;
1565       if(CkMyPe() == 0){
1566         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1567       }
1568       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1569     }
1570     // called only on 0
1571     void CkMemCheckPT::quiescence(CkCallback &cb)
1572     {
1573       static int pe_count = 0;
1574       pe_count ++;
1575       CmiAssert(CkMyPe() == 0);
1576       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1577       if (pe_count == CkNumPes()) {
1578         pe_count = 0;
1579         cb.send();
1580       }
1581     }
1582
1583     // User callable function - to start a checkpoint
1584     // callback cb is used to pass control back
1585     void CkStartMemCheckpoint(CkCallback &cb)
1586     {
1587 #if CMK_MEM_CHECKPOINT
1588       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1589       /*if (_memChkptOn == 0) {
1590         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1591         cb.send();
1592         return;
1593         }*/
1594       if (CkInRestarting()) {
1595         // trying to checkpointing during restart
1596         cb.send();
1597         return;
1598       }
1599       // store user callback and user data
1600       CkMemCheckPT::cpCallback = cb;
1601
1602
1603       //send to my replica that checkpoint begins 
1604       if(CkReplicaAlive()==1){
1605         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1606         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1607         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1608       }
1609       CpvAccess(localStarted) = 1;
1610       // broadcast to start check pointing
1611       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1612         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1613         checkptMgr.doItNow(CkMyPe());
1614       }
1615 #else
1616       // when mem checkpoint is disabled, invike cb immediately
1617       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1618       cb.send();
1619 #endif
1620     }
1621
1622     void CkRestartCheckPoint(int diePe)
1623     {
1624       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1625       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1626       // broadcast
1627       checkptMgr.restart(diePe);
1628     }
1629
1630     static int _diePE = -1;
1631
1632     // callback function used locally by ccs handler
1633     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1634     {
1635       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1636       CkRestartCheckPoint(_diePE);
1637     }
1638
1639
1640     // called on crashed PE
1641     static void restartBeginHandler(char *msg)
1642     {
1643       CmiFree(msg);
1644 #if CMK_MEM_CHECKPOINT
1645 #if CMK_USE_BARRIER
1646       if(CkMyPe()!=_diePE){
1647         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1648         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1649         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1650         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1651       }else{
1652         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1653         CkRestartCheckPointCallback(NULL, NULL);
1654       }
1655 #else
1656       static int count = 0;
1657       CmiAssert(CkMyPe() == _diePE);
1658       count ++;
1659       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1660         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1661         CkRestartCheckPointCallback(NULL, NULL);
1662         count = 0;
1663       }
1664 #endif
1665 #endif
1666     }
1667
1668     extern void _discard_charm_message();
1669     extern void _resume_charm_message();
1670
1671     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1672       return data;
1673     }
1674
1675     static void restartBcastHandler(char *msg)
1676     {
1677 #if CMK_MEM_CHECKPOINT
1678       // advance phase counter
1679       CkMemCheckPT::inRestarting = 1;
1680       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1681       // gzheng
1682       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1683
1684       if (CkMyPe()==_diePE)
1685         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1686
1687       // reset QD counters
1688       /*  gzheng
1689           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1690        */
1691
1692       /*  gzheng
1693           if (CkMyPe()==_diePE)
1694           CkRestartCheckPointCallback(NULL, NULL);
1695        */
1696       CmiFree(msg);
1697
1698       _resume_charm_message();
1699
1700       // reduction
1701       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1702       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1703 #if CMK_USE_BARRIER
1704       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1705 #else
1706       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1707 #endif 
1708       checkpointed = 0;
1709 #endif
1710     }
1711
1712     extern void _initDone();
1713
1714     bool compare(char * buf1, char *buf2){
1715       if(CkMyPe()==0)
1716         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1717       PUP::checker pchecker(buf1,buf2);
1718       pchecker.skip();
1719       int numElements;
1720       pchecker|numElements;
1721       for(int i=0;i<numElements;i++){
1722         CkGroupID gID;
1723         CkArrayIndex idx;
1724
1725         pchecker|gID;
1726         pchecker|idx;
1727
1728         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1729         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1730       }
1731       if(CkMyPe()==0)
1732         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1733       //int fault_num = pchecker.getFaultNum();
1734       bool result = pchecker.getResult();
1735       /*if(!result){
1736         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1737       }*/
1738       return result;
1739     }
1740     int getChecksum(char * buf){
1741       PUP::checker pchecker(buf);
1742       pchecker.skip();
1743       int numElements;
1744       pchecker|numElements;
1745 //      CkPrintf("num %d\n",numElements);
1746       for(int i=0;i<numElements;i++){
1747         CkGroupID gID;
1748         CkArrayIndex idx;
1749
1750         pchecker|gID;
1751         pchecker|idx;
1752
1753         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1754         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1755       }
1756       return pchecker.getChecksum();
1757     }
1758
1759     static void recvRemoteChkpHandler(char *msg){
1760       CpvAccess(remoteChkpDone) = 1;
1761       if(CpvAccess(use_checksum)){
1762         if(CkMyPe()==0)
1763           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1764         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1765         CpvAccess(recvdRemote) = 1;
1766         if(CpvAccess(recvdLocal)==1){
1767           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1768             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1769           }
1770           else{
1771             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1772           }
1773         }
1774       }else{
1775         envelope *env = (envelope *)msg;
1776         CkUnpackMessage(&env);
1777         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1778         if(CpvAccess(recvdLocal)==1){
1779           int pointer = CpvAccess(curPointer);
1780           int size = CpvAccess(chkpBuf)[pointer]->len;
1781           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1782             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1783           }else
1784           {
1785             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1786           }
1787           delete chkpMsg;
1788           if(CkMyPe()==0)
1789             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1790         }else{
1791           CpvAccess(recvdRemote) = 1;
1792           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1793           CpvAccess(buddyBuf) = chkpMsg;
1794         }
1795       }
1796     }
1797
1798     static void replicaRecoverHandler(char *msg){
1799       //fflush(stdout);
1800       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1801       CpvAccess(remoteChkpDone) = 1;
1802       if(CpvAccess(localChkpDone) == 1)
1803         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1804       CmiFree(msg);
1805     }
1806
1807     static void replicaChkpDoneHandler(char *msg){
1808       CpvAccess(remoteChkpDone) = 1;
1809       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1810       if(CpvAccess(localChkpDone) == 1)
1811         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1812       CmiFree(msg);
1813     }
1814
1815     static void replicaDieHandler(char * msg){
1816 #if CMK_CONVERSE_MPI
1817       //broadcast to every one in my replica
1818       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1819       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1820 #endif
1821     }
1822
1823     static void replicaChkpStartHandler(char * msg){
1824       CpvAccess(remoteStarted) =1;
1825       if(CpvAccess(localStarted)==1){    
1826         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1827         checkptMgr.doItNow(0);
1828       }
1829     }
1830
1831
1832     static void replicaDieBcastHandler(char *msg){
1833       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1834       CpvAccess(_remoteCrashedNode) = diePe;
1835       if(CkMyPe()==diePe){
1836         CmiPrintf("pe %d in replicad word die\n",diePe);
1837         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1838         fflush(stdout);
1839       }
1840       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1841         find_spare_mpirank(diePe,CmiMyPartition()^1);
1842       }
1843       //broadcast to my partition to get local max iter
1844       if(CpvAccess(resilience)!=1){
1845         CkMemCheckPT::replicaAlive = 0;
1846         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1847       }
1848       CmiFree(msg);
1849     }
1850
1851     static void recoverRemoteProcDataHandler(char *msg){
1852       envelope *env = (envelope *)msg;
1853       CkUnpackMessage(&env);
1854       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1855
1856       //store the checkpoint
1857       int pointer = procMsg->pointer;
1858
1859
1860       if(CkMyPe()==CpvAccess(_crashedNode)){
1861         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1862         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1863         PUP::fromMem p(procMsg->packData);
1864         _handleProcData(p,CmiTrue);
1865         _initDone();
1866       }
1867       else{
1868         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1869         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1870         //_handleProcData(p,CmiFalse);
1871       }
1872       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1873       CKLOCMGR_LOOP(mgr->startInserting(););
1874
1875       CpvAccess(recvdProcChkp) =1;
1876       if(CpvAccess(recvdArrayChkp)==1){
1877         _resume_charm_message();
1878         _diePE = CpvAccess(_crashedNode);
1879         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1880         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1881         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1882 #if CMK_USE_BARRIER
1883       CmiPrintf("[%d]before reduce proc\n", CkMyPe());  
1884         if(CpvAccess(resilience)==1){
1885           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1886         }else
1887           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1888       CmiPrintf("[%d]after reduce proc\n", CkMyPe());   
1889 #else
1890         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1891 #endif 
1892       }
1893     }
1894
1895     static void recoverRemoteArrayDataHandler(char *msg){
1896       envelope *env = (envelope *)msg;
1897       CkUnpackMessage(&env);
1898       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1899
1900       //store the checkpoint
1901       int pointer = chkpMsg->pointer;
1902       CpvAccess(curPointer) = pointer;
1903       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1904       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1905       CpvAccess(recvdArrayChkp) =1;
1906       CkMemCheckPT::inRestarting = 1;
1907       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1908         _resume_charm_message();
1909         _diePE = CpvAccess(_crashedNode);
1910         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1911         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1912         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1913         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1914 #if CMK_USE_BARRIER
1915       CmiPrintf("[%d]before reduce array\n", CkMyPe()); 
1916         if(CpvAccess(resilience)==1){
1917           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1918         }else
1919           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1920       CmiPrintf("[%d]after reduce array\n", CkMyPe());  
1921 #else
1922         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1923 #endif 
1924       }
1925     }
1926
1927     static void recvPhaseHandler(char * msg)
1928     {
1929       CpvAccess(_curRestartPhase)--;
1930       CkMemCheckPT::inRestarting = 1;
1931       CmiFree(msg);
1932       //notify the buddy in the replica now i can receive the checkpoint msg
1933       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1934         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1935         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1936         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1937         //timer start
1938         if(CpvAccess(_crashedNode)==CkMyPe())
1939           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1940         if(CpvAccess(_crashedNode)==CmiMyPe())
1941           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1942       }else{
1943         CpvAccess(curPointer)^=1;
1944         CkMemCheckPT::inRestarting = 1;
1945         _resume_charm_message();
1946         _diePE = CpvAccess(_crashedNode);
1947       }
1948     }
1949     
1950     static void askRecoverDataHandler(char * msg){
1951       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1952         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1953       if(CpvAccess(resilience)!=1){
1954         CpvAccess(remoteReady)=1;
1955         if(CpvAccess(localReady)==1){
1956           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1957           {     
1958             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1959             CkPackMessage(&env);
1960             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1961             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1962             CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
1963           }
1964           //send the array checkpoint data
1965           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1966           CkPackMessage(&env);
1967           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1968           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1969           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1970             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1971         }
1972       }else{
1973         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
1974         int pointer = CpvAccess(curPointer)^1;
1975         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1976         procMsg->pointer = pointer;
1977         envelope * env = (envelope *)(UsrToEnv(procMsg));
1978         CkPackMessage(&env);
1979         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1980         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1981         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
1982         
1983         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1984         arrayMsg->pointer = pointer;
1985         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1986         CkPackMessage(&env1);
1987         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1988         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1989         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1990       }
1991     }
1992     // called on crashed processor
1993     static void recoverProcDataHandler(char *msg)
1994     {
1995 #if CMK_MEM_CHECKPOINT
1996       int i;
1997       envelope *env = (envelope *)msg;
1998       CkUnpackMessage(&env);
1999       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2000       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2001       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2002       PUP::fromMem p(procMsg->packData);
2003       _handleProcData(p,CmiTrue);
2004
2005       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2006       // gzheng
2007       CKLOCMGR_LOOP(mgr->startInserting(););
2008
2009       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2010       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2011       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2012       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2013
2014       _initDone();
2015       //   CpvAccess(_qd)->flushStates();
2016       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2017 #endif
2018     }
2019     //for replica, got the phase number from my neighbor processor in the current partition
2020     static void askPhaseHandler(char *msg)
2021     {
2022 #if CMK_MEM_CHECKPOINT
2023       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2024       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2025       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2026       CmiSetHandler(msg, recvPhaseHandlerIdx);
2027       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2028 #endif
2029     }
2030     // called on its backup processor
2031     // get backup message buffer and sent to crashed processor
2032     static void askProcDataHandler(char *msg)
2033     {
2034 #if CMK_MEM_CHECKPOINT
2035       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2036       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2037       if (CpvAccess(procChkptBuf) == NULL)  {
2038         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2039         CkAbort("no checkpoint found");
2040       }
2041       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2042       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2043
2044       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2045
2046       CkPackMessage(&env);
2047       CmiSetHandler(env, recoverProcDataHandlerIdx);
2048       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2049       CpvAccess(procChkptBuf) = NULL;
2050       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2051 #endif
2052     }
2053
2054     // called on PE 0
2055     void qd_callback(void *m)
2056     {
2057       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2058       fflush(stdout);
2059       CkFreeMsg(m);
2060       if(CmiNumPartition()==1){
2061 #ifdef CMK_SMP
2062         for(int i=0;i<CmiMyNodeSize();i++){
2063           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2064           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2065           CmiSetHandler(msg, askProcDataHandlerIdx);
2066           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2067           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2068         }
2069         return;
2070 #endif
2071         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2072         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2073         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2074         CmiSetHandler(msg, askProcDataHandlerIdx);
2075         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2076         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2077       }
2078       else{
2079         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2080         CmiSetHandler(msg, recvPhaseHandlerIdx);
2081         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2082       }
2083     }
2084
2085     // on crashed node
2086     void CkMemRestart(const char *dummy, CkArgMsg *args)
2087     {
2088 #if CMK_MEM_CHECKPOINT
2089       _diePE = CmiMyNode();
2090       CpvAccess( _crashedNode )= CmiMyNode();
2091       CkMemCheckPT::inRestarting = 1;
2092       _discard_charm_message();
2093
2094       /*if(CmiMyRank()==0){
2095         CkCallback cb(qd_callback);
2096         CkStartQD(cb);
2097         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2098         }*/
2099       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2100       if(CmiNumPartition()==1){
2101         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2102         restartT = CmiWallTimer();
2103         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2104         fflush(stdout);
2105         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2106         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2107         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2108         CmiSetHandler(msg, askProcDataHandlerIdx);
2109         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2110         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2111       }
2112       else{
2113         CkCallback cb(qd_callback);
2114         CkStartQD(cb);
2115       }
2116 #else
2117       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2118 #endif
2119     }
2120
2121     // can be called in other files
2122     // return true if it is in restarting
2123     extern "C"
2124       int CkInRestarting()
2125       {
2126 #if CMK_MEM_CHECKPOINT
2127         if (CpvAccess( _crashedNode)!=-1) return 1;
2128         // gzheng
2129         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2130         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2131         return CkMemCheckPT::inRestarting;
2132 #else
2133         return 0;
2134 #endif
2135       }
2136
2137     extern "C"
2138       int CkReplicaAlive()
2139       {
2140         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2141         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2142         return CkMemCheckPT::replicaAlive;
2143
2144         /*if(CkMemCheckPT::replicaDead==1)
2145           return 0;
2146           else          
2147           return 1;*/
2148       }
2149
2150     extern "C"
2151       int CkInCheckpointing()
2152       {
2153         return CkMemCheckPT::inCheckpointing;
2154       }
2155
2156     extern "C"
2157       void CkSetInLdb(){
2158 #if CMK_MEM_CHECKPOINT
2159         CkMemCheckPT::inLoadbalancing = 1;
2160 #endif
2161       }
2162
2163     extern "C"
2164       int CkInLdb(){
2165 #if CMK_MEM_CHECKPOINT
2166         return CkMemCheckPT::inLoadbalancing;
2167 #endif
2168         return 0;
2169       }
2170
2171     extern "C"
2172       void CkResetInLdb(){
2173 #if CMK_MEM_CHECKPOINT
2174         CkMemCheckPT::inLoadbalancing = 0;
2175 #endif
2176       }
2177
2178     /*****************************************************************************
2179       module initialization
2180      *****************************************************************************/
2181
2182     static int arg_where = CkCheckPoint_inMEM;
2183
2184 #if CMK_MEM_CHECKPOINT
2185     void init_memcheckpt(char **argv)
2186     {
2187       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2188         arg_where = CkCheckPoint_inDISK;
2189       }
2190       CpvInitialize(int, use_checksum);
2191       CpvInitialize(int, resilience);
2192       CpvAccess(use_checksum)=0;
2193       CpvAccess(resilience)=0;
2194       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2195         CpvAccess(use_checksum)=1;
2196       }
2197       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2198         CpvAccess(resilience)=1;
2199       }
2200       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2201         CpvAccess(resilience)=2;
2202       }
2203       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2204         CpvAccess(resilience)=3;
2205       }
2206       // initiliazing _crashedNode variable
2207       CpvInitialize(int, _crashedNode);
2208       CpvInitialize(int, _remoteCrashedNode);
2209       CpvAccess(_crashedNode) = -1;
2210       CpvAccess(_remoteCrashedNode) = -1;
2211       init_FI(argv);
2212     }
2213 #endif
2214
2215     class CkMemCheckPTInit: public Chare {
2216       public:
2217         CkMemCheckPTInit(CkArgMsg *m) {
2218 #if CMK_MEM_CHECKPOINT
2219           if (arg_where == CkCheckPoint_inDISK) {
2220             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2221           }
2222           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2223           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2224 #endif
2225         }
2226     };
2227
2228     static void notifyHandler(char *msg)
2229     {
2230 #if CMK_MEM_CHECKPOINT
2231       CmiFree(msg);
2232       /* immediately increase restart phase to filter old messages */
2233       CpvAccess(_curRestartPhase) ++;
2234       CpvAccess(_qd)->flushStates();
2235       _discard_charm_message();
2236
2237 #endif
2238     }
2239
2240     extern "C"
2241       void notify_crash(int node)
2242       {
2243 #ifdef CMK_MEM_CHECKPOINT
2244         CpvAccess( _crashedNode) = node;
2245 #ifdef CMK_SMP
2246         for(int i=0;i<CkMyNodeSize();i++){
2247           CpvAccessOther(_crashedNode,i)=node;
2248         }
2249 #endif
2250         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2251         CkMemCheckPT::inRestarting = 1;
2252
2253         // this may be in interrupt handler, send a message to reset QD
2254         int pe = CmiNodeFirst(CkMyNode());
2255         for(int i=0;i<CkMyNodeSize();i++){
2256           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2257           CmiSetHandler(msg, notifyHandlerIdx);
2258           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2259         }
2260 #endif
2261       }
2262
2263     extern "C" void (*notify_crash_fn)(int node);
2264
2265
2266 #if CMK_CONVERSE_MPI
2267     void buddyDieHandler(char *msg)
2268     {
2269 #if CMK_MEM_CHECKPOINT
2270       // notify
2271       CkMemCheckPT::inRestarting = 1;
2272       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2273       notify_crash(diepe);
2274       // send message to crash pe to let it restart
2275       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2276       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2277       int buddy = obj->BuddyPE(CmiMyPe());
2278       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2279       if (buddy == diepe)  {
2280         mpi_restart_crashed(diepe, newrank);
2281       }
2282 #endif
2283     }
2284
2285     void pingHandler(void *msg)
2286     {
2287       lastPingTime = CmiWallTimer();
2288       CmiFree(msg);
2289     }
2290
2291     void pingCheckHandler()
2292     {
2293 #if CMK_MEM_CHECKPOINT
2294       double now = CmiWallTimer();
2295       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2296         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2297         int i, pe, buddy;
2298         // tell everyone the buddy dies
2299         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2300         for (i = 1; i < CmiNumPes(); i++) {
2301           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2302           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2303         }
2304         buddy = pe;
2305         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2306         fflush(stdout);
2307         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2308         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2309         CmiSetHandler(msg, buddyDieHandlerIdx);
2310         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2311         //send to everyone in the other world
2312         if(CmiNumPartition()!=1){
2313           //for(int i=0;i<CmiNumPes();i++){
2314             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2315             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2316             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2317             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2318           //}
2319         }
2320       }
2321         else 
2322           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2323 #endif
2324       }
2325
2326       void pingBuddy()
2327       {
2328 #if CMK_MEM_CHECKPOINT
2329         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2330         if (obj) {
2331           int buddy = obj->BuddyPE(CkMyPe());
2332           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2333           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2334           CmiSetHandler(msg, pingHandlerIdx);
2335           CmiGetRestartPhase(msg) = 9999;
2336           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2337         }
2338         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2339 #endif
2340       }
2341 #endif
2342
2343       // initproc
2344       void CkRegisterRestartHandler( )
2345       {
2346 #if CMK_MEM_CHECKPOINT
2347         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2348         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2349         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2350         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2351         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2352         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2353
2354         //for replica
2355         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2356         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2357         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2358         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2359         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2360         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2361         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2362         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2363         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2364         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2365
2366 #if CMK_CONVERSE_MPI
2367         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2368         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2369         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2370 #endif
2371
2372         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2373         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2374         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2375         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2376         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2377         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2378         CpvInitialize(int,curPointer);
2379         CpvInitialize(int,recvdLocal);
2380         CpvInitialize(int,localChkpDone);
2381         CpvInitialize(int,remoteChkpDone);
2382         CpvInitialize(int,remoteStarted);
2383         CpvInitialize(int,localStarted);
2384         CpvInitialize(int,localReady);
2385         CpvInitialize(int,remoteReady);
2386         CpvInitialize(int,recvdRemote);
2387         CpvInitialize(int,recvdProcChkp);
2388         CpvInitialize(int,localChecksum);
2389         CpvInitialize(int,remoteChecksum);
2390         CpvInitialize(int,recvdArrayChkp);
2391
2392         CpvAccess(procChkptBuf) = NULL;
2393         CpvAccess(buddyBuf) = NULL;
2394         CpvAccess(recoverProcBuf) = NULL;
2395         CpvAccess(recoverArrayBuf) = NULL;
2396         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2397         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2398         CpvAccess(chkpBuf)[0] = NULL;
2399         CpvAccess(chkpBuf)[1] = NULL;
2400         CpvAccess(localProcChkpBuf)[0] = NULL;
2401         CpvAccess(localProcChkpBuf)[1] = NULL;
2402
2403         CpvAccess(curPointer) = 0;
2404         CpvAccess(recvdLocal) = 0;
2405         CpvAccess(localChkpDone) = 0;
2406         CpvAccess(remoteChkpDone) = 0;
2407         CpvAccess(remoteStarted) = 0;
2408         CpvAccess(localStarted) = 0;
2409         CpvAccess(localReady) = 0;
2410         CpvAccess(remoteReady) = 0;
2411         CpvAccess(recvdRemote) = 0;
2412         CpvAccess(recvdProcChkp) = 0;
2413         CpvAccess(localChecksum) = 0;
2414         CpvAccess(remoteChecksum) = 0;
2415         CpvAccess(recvdArrayChkp) = 0;
2416
2417         notify_crash_fn = notify_crash;
2418
2419 #if ! CMK_CONVERSE_MPI
2420         // print pid to kill
2421         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2422         //  sleep(4);
2423 #endif
2424 #endif
2425       }
2426
2427
2428       extern "C"
2429         int CkHasCheckpoints()
2430         {
2431           return checkpointed;
2432         }
2433
2434       /// @todo: the following definitions should be moved to a separate file containing
2435       // structures and functions about fault tolerance strategies
2436
2437       /**
2438        *  * @brief: function for killing a process                                             
2439        *   */
2440 #ifdef CMK_MEM_CHECKPOINT
2441 #if CMK_HAS_GETPID
2442       void killLocal(void *_dummy,double curWallTime){
2443         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2444         if(CmiWallTimer()<killTime-1){
2445           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2446         }else{ 
2447 #if CMK_CONVERSE_MPI
2448           CkDieNow();
2449 #else 
2450           kill(getpid(),SIGKILL);                                               
2451 #endif
2452         }              
2453       } 
2454 #else
2455       void killLocal(void *_dummy,double curWallTime){
2456         CmiAbort("kill() not supported!");
2457       }
2458 #endif
2459 #endif
2460
2461 #ifdef CMK_MEM_CHECKPOINT
2462       /**
2463        * @brief: reads the file with the kill information
2464        */
2465       void readKillFile(){
2466         FILE *fp=fopen(killFile,"r");
2467         if(!fp){
2468           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2469           return;
2470         }
2471         int proc;
2472         double sec;
2473         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2474           if(proc == CkMyNode() && CkMyRank() == 0){
2475             killTime = CmiWallTimer()+sec;
2476             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2477             CcdCallFnAfter(killLocal,NULL,sec*1000);
2478           }
2479         }
2480         fclose(fp);
2481       }
2482
2483 #if ! CMK_CONVERSE_MPI
2484       void CkDieNow()
2485       {
2486 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2487         // ignored for non-mpi version
2488         CmiPrintf("[%d] die now.\n", CmiMyPe());
2489         killTime = CmiWallTimer()+0.001;
2490         CcdCallFnAfter(killLocal,NULL,1);
2491 #endif
2492       }
2493 #endif
2494
2495 #endif
2496
2497 #include "CkMemCheckpoint.def.h"
2498
2499