fix for migration
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   if(CkMyPe()==0){
440     CkPrintf("local max iter is %d\n",localMaxIter);
441   }
442   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
443   int elemCount = CkCountChkpSyncElements();
444   if(CkMyPe()==0)
445     startTime = CmiWallTimer();
446   if(elemCount == 0){
447     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448   }
449 }
450
451 //when one replica failes, decide the next chkp iter
452
453 void CkMemCheckPT::recvIter(int iter){
454   if(maxIter<iter){
455     maxIter=iter;
456   }
457 }
458
459 void CkMemCheckPT::recvMaxIter(int iter){
460   localDecided = false;
461   if(CkMyPe()==0)
462     CkPrintf("checkpoint iteration is %d\n",iter);
463   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
464 }
465
466 void CkMemCheckPT::reachChkpIter(){
467   recvIterCount++;
468   elemCount = CkCountChkpSyncElements();
469   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
470   if(recvIterCount == elemCount){
471     recvIterCount = 0;
472     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
473   }
474 }
475
476 void CkMemCheckPT::startChkp(){
477   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
478   CkStartMemCheckpoint(cpCallback);
479 }
480
481 // called by checkpoint mgr to restore an array element
482 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
483 {
484 #if CMK_MEM_CHECKPOINT
485   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
486   // m->index.print();
487   PUP::fromMem p(m->packData);
488   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
489   CmiAssert(mgr);
490 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
491   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
492 #else
493   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
494 #endif
495
496   // find a list of array elements bound together
497   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
498   CmiAssert(elt);
499   CkLocRec_local *rec = elt->myRec;
500   CkVec<CkMigratable *> list;
501   mgr->migratableList(rec, list);
502   CmiAssert(list.length() > 0);
503   for (int l=0; l<list.length(); l++) {
504     elt = (ArrayElement *)list[l];
505     elt->budPEs[0] = m->bud1;
506     elt->budPEs[1] = m->bud2;
507     //    reset, may not needed now
508     // for now.
509     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
510       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
511       if (c) c->redNo = 0;
512     }
513   }
514 #endif
515 }
516
517 // return 1 if pe is a crashed processor
518 int CkMemCheckPT::isFailed(int pe)
519 {
520   for (int i=0; i<failedPes.length(); i++)
521     if (failedPes[i] == pe) return 1;
522   return 0;
523 }
524
525 // add pe into history list of all failed processors
526 void CkMemCheckPT::failed(int pe)
527 {
528   if (isFailed(pe)) return;
529   failedPes.push_back(pe);
530 }
531
532 int CkMemCheckPT::totalFailed()
533 {
534   return failedPes.length();
535 }
536
537 // create an checkpoint entry for array element of aid with index.
538 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
539 {
540   // error check, no duplicate
541   int idx, len = ckTable.size();
542   for (idx=0; idx<len; idx++) {
543     CkCheckPTInfo *entry = ckTable[idx];
544     if (index == entry->index) {
545       if (loc == entry->locMgr) {
546         // bindTo array elements
547         return;
548       }
549       // for array inheritance, the following check may fail
550       // because ArrayElement constructor of all superclasses are called
551       if (aid == entry->aid) {
552         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
553         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
554       }
555     }
556   }
557   CkCheckPTInfo *newEntry;
558   if (where == CkCheckPoint_inMEM)
559     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
560   else
561     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
562   ckTable.push_back(newEntry);
563   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
564 }
565
566 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
567 {
568 #if !CMK_CHKP_ALL       
569   int buddy = msg->bud1;
570   if (buddy == CkMyPe()) buddy = msg->bud2;
571   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
572   recvData(msg);
573   // ack
574   thisProxy[buddy].gotData();
575 #else
576   chkpTable[0] = NULL;
577   chkpTable[1] = NULL;
578   recvArrayCheckpoint(msg);
579   thisProxy[msg->bud2].gotData();
580 #endif
581 }
582
583 // loop through my checkpoint table and ask checkpointed array elements
584 // to send me checkpoint data.
585 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
586 void CkMemCheckPT::doItNow(int starter)
587 {
588   checkpointed = 1;
589   //cpCallback = cb;
590   cpStarter = starter;
591   inCheckpointing = 1;
592   if (CkMyPe() == cpStarter) {
593     startTime = CmiWallTimer();
594     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
595   }
596 #if CMK_CONVERSE_MPI
597   if(CmiNumPartition()==1)
598 #endif    
599   {
600
601 #if !CMK_CHKP_ALL
602     int len = ckTable.length();
603     for (int i=0; i<len; i++) {
604       CkCheckPTInfo *entry = ckTable[i];
605       // always let the bigger number processor send request
606       //if (CkMyPe() < entry->pNo) continue;
607       // always let the smaller number processor send request, may on same proc
608       if (!isMaster(entry->pNo)) continue;
609       // call inmem_checkpoint to the array element, ask it to send
610       // back checkpoint data via recvData().
611       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
612       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
613     }
614     // if my table is empty, then I am done
615     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
616 #else
617     startArrayCheckpoint();
618 #endif
619     sendProcData();
620   }
621 #if CMK_CONVERSE_MPI
622   else
623   {
624     startCheckpoint();
625   }
626 #endif
627   // pack and send proc level data
628 }
629
630 class MemElementPacker : public CkLocIterator{
631   private:
632     //    CkLocMgr *locMgr;
633     PUP::er &p;
634     std::map<CkHashCode,CkLocation> arrayMap;
635   public:
636     MemElementPacker(PUP::er &p_):p(p_){};
637     void addLocation(CkLocation &loc){
638       CkArrayIndexMax idx = loc.getIndex();
639       arrayMap[idx.hash()] = loc;
640     }
641     void writeCheckpoint(){
642       std::map<CkHashCode, CkLocation>::iterator it;
643       for(it = arrayMap.begin();it!=arrayMap.end();it++){
644         CkLocation loc = it->second;
645         CkLocMgr *locMgr = loc.getManager();
646         CkArrayIndexMax idx = loc.getIndex();
647         CkGroupID gID = locMgr->ckGetGroupID();
648         p|gID;
649         p|idx;
650         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
651       }
652     }
653 };
654
655 void pupAllElements(PUP::er &p){
656 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
657   int numElements;
658   if(!p.isUnpacking()){
659     numElements = CkCountArrayElements();
660   }
661   p | numElements;
662   if(!p.isUnpacking()){
663     MemElementPacker packer(p);
664     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
665     packer.writeCheckpoint();
666   }
667 #endif
668 }
669
670 void CkMemCheckPT::startArrayCheckpoint(){
671 #if CMK_CHKP_ALL
672   int size;
673   {
674     PUP::sizer psizer;
675     pupAllElements(psizer);
676     size = psizer.size();
677   }
678   int packSize = size/sizeof(double)+1;
679   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
680   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
681   msg->len = size;
682   msg->cp_flag = 1;
683   int budPEs[2];
684   msg->bud1=CkMyPe();
685   msg->bud2=ChkptOnPe(CkMyPe());
686   {
687     PUP::toMem p(msg->packData);
688     pupAllElements(p);
689   }
690   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
691   if(chkpTable[0]) delete chkpTable[0];
692   chkpTable[0] = msg;
693   //send the checkpoint to my 
694   recvCount++;
695 #endif
696 }
697
698 void CkMemCheckPT::startCheckpoint(){
699 #if CMK_CONVERSE_MPI
700   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
701     CkPrintf("in start checkpointing!!!!\n");
702   int size;
703   {
704     PUP::sizer p;
705     _handleProcData(p,CmiFalse);
706     size = p.size();
707   }
708   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
709   procMsg->len = size;
710   procMsg->cp_flag = 1;
711   {
712     PUP::toMem p(procMsg->packData);
713     _handleProcData(p,CmiFalse);
714   }
715   int pointer = CpvAccess(curPointer);
716   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
717   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
718
719   {
720     PUP::sizer psizer;
721     pupAllElements(psizer);
722     size = psizer.size();
723   }
724   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
725   msg->len = size;
726   msg->cp_flag = 1;
727   int checksum;
728   {
729     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
730       PUP::checker p(msg->packData);
731       pupAllElements(p);
732       checksum = p.getChecksum();
733     }else{  
734 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
735       PUP::toMem p(msg->packData);
736       pupAllElements(p);
737     }
738   }
739   pointer = CpvAccess(curPointer);
740   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
741   CpvAccess(chkpBuf)[pointer] = msg;
742   if(CkMyPe()==0)
743     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
744   if(CkReplicaAlive()==1){
745     CpvAccess(recvdLocal) = 1;
746     if(CpvAccess(use_checksum)){
747       CpvAccess(localChecksum) = checksum;
748       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
749       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
750       //only one reaplica will send
751       if(CmiMyPartition()==0){
752         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
753         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
754       }
755     }else{
756       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
757       CkPackMessage(&env);
758       if(CmiMyPartition()==0){
759         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
760         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
761       }
762     }
763     if(CmiMyPartition()==0){
764       notifyReplica = 1;
765       thisProxy[CkMyPe()].doneComparison(true);
766     }
767   }
768   if(CpvAccess(recvdRemote)==1){
769     //only partition 1 will do it
770     //compare the checkpoint 
771     int size = CpvAccess(chkpBuf)[pointer]->len;
772     if(CpvAccess(use_checksum)){
773       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
774         thisProxy[CkMyPe()].doneComparison(true);
775       }
776       else{
777         thisProxy[CkMyPe()].doneComparison(false);
778       }
779     }else{
780       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
781         thisProxy[CkMyPe()].doneComparison(true);
782       }
783       else{
784         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
785         thisProxy[CkMyPe()].doneComparison(false);
786       }
787     }
788     if(CkMyPe()==0)
789       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
790   }
791   else{
792     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
793       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
794       if(CpvAccess(remoteReady)==1){
795         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
796         {       
797           int pointer = CpvAccess(curPointer);
798           //send the proc data
799           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
800           procMsg->pointer = pointer;
801           envelope * env = (envelope *)(UsrToEnv(procMsg));
802           CkPackMessage(&env);
803           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
804           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
805           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
806             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
807           }
808         }
809         //send the array checkpoint data
810         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
811         msg->pointer = CpvAccess(curPointer);
812         envelope * env = (envelope *)(UsrToEnv(msg));
813         CkPackMessage(&env);
814         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
815         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
816         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
817           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
818       }else{
819         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
820           int pointer = CpvAccess(curPointer);
821           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
822           (CpvAccess(recoverProcBuf))->pointer = pointer;
823         }
824         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
825         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
826         CpvAccess(localReady) = 1;
827       }
828       //can continue work, no need to wait for my replica
829       int _ret = 1;
830       notifyReplica = 1;
831       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
832       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
833     }
834   }
835 #endif
836 }
837
838 void CkMemCheckPT::doneComparison(bool ret){
839   int _ret = 1;
840   if(!ret){
841     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
842     _ret = 0;
843   }else{
844     _ret = 1;
845   }
846   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
847   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
848 }
849
850 void CkMemCheckPT::doneRComparison(int ret){
851   //    if(CpvAccess(curPointer) == 0){
852   //if(ret==CkNumPes()){
853     CpvAccess(localChkpDone) = 1;
854     if(CpvAccess(remoteChkpDone) ==1){
855       thisProxy.doneBothComparison();
856     }else{
857       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
858     }
859   //}
860   /*else{
861     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
862     startTime = CmiWallTimer();
863     thisProxy.RollBack();
864   }*/
865   if(notifyReplica == 0){
866     //notify the replica am done
867     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
868     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
869     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
870     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
871     notifyReplica = 1;
872   }
873 }
874
875 void CkMemCheckPT::doneBothComparison(){
876   CpvAccess(recvdRemote) = 0;
877   CpvAccess(remoteReady) = 0;
878   CpvAccess(localReady) = 0;
879   CpvAccess(recvdLocal) = 0;
880   CpvAccess(localChkpDone) = 0;
881   CpvAccess(remoteChkpDone) = 0;
882   CpvAccess(remoteStarted) = 0;
883   CpvAccess(localStarted) = 0;
884   CpvAccess(_remoteCrashedNode) = -1;
885   CkMemCheckPT::replicaAlive = 1;
886   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
887   CpvAccess(curPointer)^=1;
888   inCheckpointing = 0;
889   notifyReplica = 0;
890   if(CkMyPe() == 0){
891     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
892   }
893   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
894 }
895
896 void CkMemCheckPT::RollBack(){
897   //restore group data
898   checkpointed = 0;
899   inRestarting = 1;
900   int pointer = CpvAccess(curPointer)^1;//use the previous one
901   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
902   PUP::fromMem p(chkpMsg->packData);    
903
904   //destroy array elements
905   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
906   int numGroups = CkpvAccess(_groupIDTable)->size();
907   for(int i=0;i<numGroups;i++) {
908     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
909     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
910     obj->flushStates();
911     obj->ckJustMigrated();
912   }
913   //restore array elements
914
915   int numElements;
916   p|numElements;
917
918   if(p.isUnpacking()){
919     for(int i=0;i<numElements;i++){
920       CkGroupID gID;
921       CkArrayIndex idx;
922       p|gID;
923       p|idx;
924       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
925       CmiAssert(mgr);
926       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
927     }
928     }
929     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
930     contribute(cb);
931   }
932
933   void CkMemCheckPT::notifyReplicaDie(int pe){
934     replicaAlive = 0;
935     CpvAccess(_remoteCrashedNode) = pe;
936   }
937
938   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
939   {
940 #if CMK_CHKP_ALL
941     int idx = 1;
942     if(msg->bud1 == CkMyPe()){
943       idx = 0;
944     }
945     int isChkpting = msg->cp_flag;
946     if(isChkpting == 1){
947       if(chkpTable[idx]) delete chkpTable[idx];
948     }
949     chkpTable[idx] = msg;
950     if(isChkpting){
951       recvCount++;
952       if(recvCount == 2){
953         if (where == CkCheckPoint_inMEM) {
954           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
955         }
956         else if (where == CkCheckPoint_inDISK) {
957           // another barrier for finalize the writing using fsync
958           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
959           contribute(0,NULL,CkReduction::sum_int,localcb);
960         }
961         else
962           CmiAbort("Unknown checkpoint scheme");
963         recvCount = 0;
964       }
965     }
966 #endif
967   }
968
969   // don't handle array elements
970   static inline void _handleProcData(PUP::er &p, CmiBool create)
971   {
972     // save readonlys, and callback BTW
973     CkPupROData(p);
974
975     // save mainchares 
976     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
977
978 #ifndef CMK_CHARE_USE_PTR
979     // save non-migratable chare
980     CkPupChareData(p);
981 #endif
982
983     // save groups into Groups.dat
984     CkPupGroupData(p,create);
985
986     // save nodegroups into NodeGroups.dat
987     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
988   }
989
990   void CkMemCheckPT::sendProcData()
991   {
992     // find out size of buffer
993     int size;
994     {
995       PUP::sizer p;
996       _handleProcData(p,CmiTrue);
997       size = p.size();
998     }
999     int packSize = size;
1000     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1001     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1002     {
1003       PUP::toMem p(msg->packData);
1004       _handleProcData(p,CmiTrue);
1005     }
1006     msg->pe = CkMyPe();
1007     msg->len = size;
1008     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1009     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1010   }
1011
1012   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1013   {
1014     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1015     CpvAccess(procChkptBuf) = msg;
1016     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1017     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1018   }
1019
1020   // ArrayElement call this function to give us the checkpointed data
1021   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1022   {
1023     int len = ckTable.length();
1024     int idx;
1025     for (idx=0; idx<len; idx++) {
1026       CkCheckPTInfo *entry = ckTable[idx];
1027       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1028     }
1029     CkAssert(idx < len);
1030     int isChkpting = msg->cp_flag;
1031     ckTable[idx]->updateBuffer(msg);
1032     if (isChkpting) {
1033       // all my array elements have returned their inmem data
1034       // inform starter processor that I am done.
1035       recvCount ++;
1036       if (recvCount == ckTable.length()) {
1037         if (where == CkCheckPoint_inMEM) {
1038           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1039         }
1040         else if (where == CkCheckPoint_inDISK) {
1041           // another barrier for finalize the writing using fsync
1042           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1043           contribute(0,NULL,CkReduction::sum_int,localcb);
1044         }
1045         else
1046           CmiAbort("Unknown checkpoint scheme");
1047         recvCount = 0;
1048       } 
1049     }
1050   }
1051
1052   // only used in disk checkpointing
1053   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1054   {
1055     delete m;
1056 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1057     system("sync");
1058 #endif
1059     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1060   }
1061
1062   // only is called on cpStarter when checkpoint is done
1063   void CkMemCheckPT::cpFinish()
1064   {
1065     CmiAssert(CkMyPe() == cpStarter);
1066     peCount++;
1067     // now that all processors have finished, activate callback
1068     if (peCount == 2) 
1069     {
1070       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1071       peCount = 0;
1072       thisProxy.report();
1073     }
1074   }
1075
1076   // for debugging, report checkpoint info
1077   void CkMemCheckPT::report()
1078   {
1079     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1080     inCheckpointing = 0;
1081 #if !CMK_CHKP_ALL
1082     int objsize = 0;
1083     int len = ckTable.length();
1084     for (int i=0; i<len; i++) {
1085       CkCheckPTInfo *entry = ckTable[i];
1086       CmiAssert(entry);
1087       objsize += entry->getSize();
1088     }
1089     CmiAssert(CpvAccess(procChkptBuf));
1090     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1091 #else
1092     if(CkMyPe()==0)
1093       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1094 #endif
1095   }
1096
1097   /*****************************************************************************
1098     RESTART Procedure
1099    *****************************************************************************/
1100
1101   // master processor of two buddies
1102   inline int CkMemCheckPT::isMaster(int buddype)
1103   {
1104 #if 0
1105     int mype = CkMyPe();
1106     //CkPrintf("ismaster: %d %d\n", pe, mype);
1107     if (CkNumPes() - totalFailed() == 2) {
1108       return mype > buddype;
1109     }
1110     for (int i=1; i<CkNumPes(); i++) {
1111       int me = (buddype+i)%CkNumPes();
1112       if (isFailed(me)) continue;
1113       if (me == mype) return 1;
1114       else return 0;
1115     }
1116     return 0;
1117 #else
1118     // smaller one
1119     int mype = CkMyPe();
1120     //CkPrintf("ismaster: %d %d\n", pe, mype);
1121     if (CkNumPes() - totalFailed() == 2) {
1122       return mype < buddype;
1123     }
1124 #if NODE_CHECKPOINT
1125     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1126     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1127 #else
1128       for (int i=1; i<CkNumPes(); i++) {
1129 #endif
1130         int me = (mype+i)%CkNumPes();
1131         if (isFailed(me)) continue;
1132         if (me == buddype) return 1;
1133         else return 0;
1134       }
1135       return 0;
1136 #endif
1137     }
1138
1139
1140
1141 #if 0
1142     // helper class to pup all elements that belong to same ckLocMgr
1143     class ElementDestoryer : public CkLocIterator {
1144       private:
1145         CkLocMgr *locMgr;
1146       public:
1147         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1148         void addLocation(CkLocation &loc) {
1149           CkArrayIndex idx=loc.getIndex();
1150           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1151           loc.destroy();
1152         }
1153     };
1154 #endif
1155
1156     // restore the bitmap vector for LB
1157     void CkMemCheckPT::resetLB(int diepe)
1158     {
1159 #if CMK_LBDB_ON
1160       int i;
1161       char *bitmap = new char[CkNumPes()];
1162       // set processor available bitmap
1163       get_avail_vector(bitmap);
1164       for (i=0; i<failedPes.length(); i++)
1165         bitmap[failedPes[i]] = 0; 
1166       bitmap[diepe] = 0;
1167
1168 #if CK_NO_PROC_POOL
1169       set_avail_vector(bitmap);
1170 #endif
1171
1172       // if I am the crashed pe, rebuild my failedPEs array
1173       if (CkMyNode() == diepe)
1174         for (i=0; i<CkNumPes(); i++) 
1175           if (bitmap[i]==0) failed(i);
1176
1177       delete [] bitmap;
1178 #endif
1179     }
1180
1181     static double restartT;
1182
1183     // in case when failedPe dies, everybody go through its checkpoint table:
1184     // destory all array elements
1185     // recover lost buddies
1186     // reconstruct all array elements from check point data
1187     // called on all processors
1188     void CkMemCheckPT::restart(int diePe)
1189     {
1190 #if CMK_MEM_CHECKPOINT
1191       thisFailedPe = diePe;
1192       double curTime = CmiWallTimer();
1193       if (CkMyPe() == thisFailedPe){
1194         restartT = CmiWallTimer();
1195         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1196       }
1197       stage = (char*)"resetLB";
1198       startTime = curTime;
1199       if (CkMyPe() == thisFailedPe)
1200         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1201
1202 //#if CK_NO_PROC_POOL
1203 //      failed(diePe);  // add into the list of failed pes
1204 //#endif
1205
1206 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1207
1208       inRestarting = 1;
1209
1210       // disable load balancer's barrier
1211       //if (CkMyPe() != diePe) resetLB(diePe);
1212
1213       CKLOCMGR_LOOP(mgr->startInserting(););
1214
1215
1216       if(CmiNumPartition()==1){
1217         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1218       }else{
1219         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1220         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1221       }
1222 #endif
1223     }
1224
1225     // loally remove all array elements
1226     void CkMemCheckPT::removeArrayElements()
1227     {
1228 #if CMK_MEM_CHECKPOINT
1229       int len = ckTable.length();
1230       double curTime = CmiWallTimer();
1231       if (CkMyPe() == thisFailedPe) 
1232         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1233       stage = (char*)"removeArrayElements";
1234       startTime = curTime;
1235
1236       //  if (cpCallback.isInvalid()) 
1237       //          CkPrintf("invalid pe %d\n",CkMyPe());
1238       //          CkAbort("Didn't set restart callback\n");;
1239       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1240
1241       // get rid of all buffering and remote recs
1242       // including destorying all array elements
1243 #if CK_NO_PROC_POOL  
1244       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1245 #else
1246       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1247 #endif
1248       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1249 #endif
1250     }
1251
1252     // flush state in reduction manager
1253     void CkMemCheckPT::resetReductionMgr()
1254     {
1255       if (CkMyPe() == thisFailedPe) 
1256         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1257       stage = (char *)"resetReductionMgr";
1258       int numGroups = CkpvAccess(_groupIDTable)->size();
1259       for(int i=0;i<numGroups;i++) {
1260         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1261         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1262         obj->flushStates();
1263         obj->ckJustMigrated();
1264       }
1265       // reset again
1266       //CpvAccess(_qd)->flushStates();
1267       if(CmiNumPartition()==1){
1268         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1269       }
1270       else{
1271         if (CkMyPe() == thisFailedPe) 
1272           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1273         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1274       }
1275     }
1276
1277     // recover the lost buddies
1278     void CkMemCheckPT::recoverBuddies()
1279     {
1280       int idx;
1281       int len = ckTable.length();
1282       // ready to flush reduction manager
1283       // cannot be CkMemCheckPT::restart because destory will modify states
1284       double curTime = CmiWallTimer();
1285       if (CkMyPe() == thisFailedPe)
1286         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1287       stage = (char *)"recoverBuddies";
1288       if (CkMyPe() == thisFailedPe)
1289         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1290       startTime = curTime;
1291
1292       // recover buddies
1293       expectCount = 0;
1294 #if !CMK_CHKP_ALL
1295       for (idx=0; idx<len; idx++) {
1296         CkCheckPTInfo *entry = ckTable[idx];
1297         if (entry->pNo == thisFailedPe) {
1298 #if CK_NO_PROC_POOL
1299           // find a new buddy
1300           int budPe = BuddyPE(CkMyPe());
1301 #else
1302           int budPe = thisFailedPe;
1303 #endif
1304           CkArrayCheckPTMessage *msg = entry->getCopy();
1305           msg->bud1 = budPe;
1306           msg->bud2 = CkMyPe();
1307           msg->cp_flag = 0;            // not checkpointing
1308           thisProxy[budPe].recoverEntry(msg);
1309           expectCount ++;
1310         }
1311       }
1312 #else
1313       //send to failed pe
1314       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1315 #if CK_NO_PROC_POOL
1316         // find a new buddy
1317         int budPe = BuddyPE(CkMyPe());
1318 #else
1319         int budPe = thisFailedPe;
1320 #endif
1321         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1322         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1323         msg->cp_flag = 0;            // not checkpointing
1324         msg->bud1 = budPe;
1325         msg->bud2 = CkMyPe();
1326         thisProxy[budPe].recoverEntry(msg);
1327         expectCount ++;
1328       }
1329 #endif
1330
1331       if (expectCount == 0) {
1332         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1333       }
1334       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1335     }
1336
1337     void CkMemCheckPT::gotData()
1338     {
1339       ackCount ++;
1340       if (ackCount == expectCount) {
1341         ackCount = 0;
1342         expectCount = -1;
1343         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1344         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1345       }
1346     }
1347
1348     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1349     {
1350
1351       for (int i=0; i<n; i++) {
1352         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1353         mgr->updateLocation(idx[i], nowOnPe);
1354       }
1355       thisProxy[nowOnPe].gotReply();
1356     }
1357
1358     // restore array elements
1359     void CkMemCheckPT::recoverArrayElements()
1360     {
1361       double curTime = CmiWallTimer();
1362       int len = ckTable.length();
1363       CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1364       stage = (char *)"recoverArrayElements";
1365       startTime = curTime;
1366       int flag = 0;
1367       // recover all array elements
1368       int count = 0;
1369
1370 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1371       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1372       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1373 #endif
1374
1375 #if !CMK_CHKP_ALL
1376       for (int idx=0; idx<len; idx++)
1377       {
1378         CkCheckPTInfo *entry = ckTable[idx];
1379 #if CK_NO_PROC_POOL
1380         // the bigger one will do 
1381         //    if (CkMyPe() < entry->pNo) continue;
1382         if (!isMaster(entry->pNo)) continue;
1383 #else
1384         // smaller one do it, which has the original object
1385         if (CkMyPe() == entry->pNo+1 || 
1386             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1387 #endif
1388         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1389
1390         entry->updateBuddy(CkMyPe(), entry->pNo);
1391         CkArrayCheckPTMessage *msg = entry->getCopy();
1392         // gzheng
1393         //thisProxy[CkMyPe()].inmem_restore(msg);
1394         inmem_restore(msg);
1395 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1396         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1397         int homePe = mgr->homePe(msg->index);
1398         if (homePe != CkMyPe()) {
1399           gmap[homePe].push_back(msg->locMgr);
1400           imap[homePe].push_back(msg->index);
1401         }
1402 #endif
1403         CkFreeMsg(msg);
1404         count ++;
1405       }
1406 #else
1407       char * packData;
1408       if(CmiNumPartition()==1){
1409         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1410         packData = (char *)msg->packData;
1411       }
1412       else{
1413         int pointer = CpvAccess(curPointer);
1414         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1415         packData = msg->packData;
1416       }
1417 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1418       recoverAll(packData,gmap,imap);
1419 #else
1420       recoverAll(packData);
1421 #endif
1422 #endif
1423 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1424       for (int i=0; i<CkNumPes(); i++) {
1425         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1426           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1427           flag++;       
1428         }
1429       }
1430       delete [] imap;
1431       delete [] gmap;
1432 #endif
1433       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1434       CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1435       //if (CkMyPe() == thisFailedPe)
1436
1437       CKLOCMGR_LOOP(mgr->doneInserting(););
1438
1439       // _crashedNode = -1;
1440       CpvAccess(_crashedNode) = -1;
1441 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1442       if (CkMyPe() == 0)
1443         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1444 #else
1445       if(flag == 0)
1446       {
1447         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1448       }
1449 #endif
1450     }
1451
1452     void CkMemCheckPT::gotReply(){
1453       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1454     }
1455
1456     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1457 #if CMK_CHKP_ALL
1458       PUP::fromMem p(packData);
1459       int numElements;
1460       p|numElements;
1461       if(p.isUnpacking()){
1462         for(int i=0;i<numElements;i++){
1463           CkGroupID gID;
1464           CkArrayIndex idx;
1465           p|gID;
1466           p|idx;
1467           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1468           int homePe = mgr->homePe(idx);
1469 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1470           mgr->resume(idx,p,CmiTrue,CmiTrue);
1471 #else
1472           if(CmiNumPartition()==1)
1473             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1474           else{
1475             if(CkMyPe()==thisFailedPe){
1476               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1477             }
1478             else{
1479               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1480             }
1481           }
1482 #endif
1483 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1484           homePe = mgr->homePe(idx);
1485           if (homePe != CkMyPe()) {
1486             gmap[homePe].push_back(gID);
1487             imap[homePe].push_back(idx);
1488           }
1489 #endif
1490         }
1491       }
1492 #endif
1493     }
1494
1495
1496     // on every processor
1497     // turn load balancer back on
1498     void CkMemCheckPT::finishUp()
1499     {
1500       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1501       //CKLOCMGR_LOOP(mgr->doneInserting(););
1502
1503       if (CkMyPe() == thisFailedPe)
1504       {
1505         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1506         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1507         fflush(stdout);
1508       }
1509       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1510       inRestarting = 0;
1511       maxIter = -1;
1512 #if CMK_CONVERSE_MPI    
1513       if(CmiNumPartition()!=1){
1514         CpvAccess(recvdProcChkp) = 0;
1515         CpvAccess(recvdArrayChkp) = 0;
1516         CpvAccess(curPointer)^=1;
1517         //notify my replica, restart is done
1518         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1519           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1520           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1521           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1522         }
1523       }
1524       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1525         lastPingTime = CmiWallTimer();
1526         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1527       }
1528 #endif
1529
1530 #if CK_NO_PROC_POOL
1531 #if NODE_CHECKPOINT
1532       int numnodes = CmiNumPhysicalNodes();
1533 #else
1534       int numnodes = CkNumPes();
1535 #endif
1536       if (numnodes-totalFailed() <=2) {
1537         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1538         _memChkptOn = 0;
1539       }
1540 #endif
1541     }
1542
1543     void CkMemCheckPT::recoverFromSoftFailure()
1544     {
1545       inRestarting = 0;
1546       maxIter = -1;
1547       CpvAccess(recvdRemote) = 0;
1548       CpvAccess(recvdLocal) = 0;
1549       CpvAccess(localChkpDone) = 0;
1550       CpvAccess(remoteChkpDone) = 0;
1551       CpvAccess(remoteReady) = 0;
1552       CpvAccess(localReady) = 0;
1553       inCheckpointing = 0;
1554       notifyReplica = 0;
1555       CpvAccess(remoteStarted) = 0;
1556       CpvAccess(localStarted) = 0;
1557       CpvAccess(_remoteCrashedNode) = -1;
1558       CkMemCheckPT::replicaAlive = 1;
1559       inCheckpointing = 0;
1560       notifyReplica = 0;
1561       if(CkMyPe() == 0){
1562         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1563       }
1564       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1565     }
1566     // called only on 0
1567     void CkMemCheckPT::quiescence(CkCallback &cb)
1568     {
1569       static int pe_count = 0;
1570       pe_count ++;
1571       CmiAssert(CkMyPe() == 0);
1572       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1573       if (pe_count == CkNumPes()) {
1574         pe_count = 0;
1575         cb.send();
1576       }
1577     }
1578
1579     // User callable function - to start a checkpoint
1580     // callback cb is used to pass control back
1581     void CkStartMemCheckpoint(CkCallback &cb)
1582     {
1583 #if CMK_MEM_CHECKPOINT
1584       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1585       /*if (_memChkptOn == 0) {
1586         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1587         cb.send();
1588         return;
1589         }*/
1590       if (CkInRestarting()) {
1591         // trying to checkpointing during restart
1592         cb.send();
1593         return;
1594       }
1595       // store user callback and user data
1596       CkMemCheckPT::cpCallback = cb;
1597
1598
1599       //send to my replica that checkpoint begins 
1600       if(CkReplicaAlive()==1){
1601         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1602         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1603         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1604       }
1605       CpvAccess(localStarted) = 1;
1606       // broadcast to start check pointing
1607       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1608         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1609         checkptMgr.doItNow(CkMyPe());
1610       }
1611 #else
1612       // when mem checkpoint is disabled, invike cb immediately
1613       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1614       cb.send();
1615 #endif
1616     }
1617
1618     void CkRestartCheckPoint(int diePe)
1619     {
1620       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1621       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1622       // broadcast
1623       checkptMgr.restart(diePe);
1624     }
1625
1626     static int _diePE = -1;
1627
1628     // callback function used locally by ccs handler
1629     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1630     {
1631       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1632       CkRestartCheckPoint(_diePE);
1633     }
1634
1635
1636     // called on crashed PE
1637     static void restartBeginHandler(char *msg)
1638     {
1639       CmiFree(msg);
1640 #if CMK_MEM_CHECKPOINT
1641 #if CMK_USE_BARRIER
1642       if(CkMyPe()!=_diePE){
1643         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1644         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1645         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1646         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1647       }else{
1648         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1649         CkRestartCheckPointCallback(NULL, NULL);
1650       }
1651 #else
1652       static int count = 0;
1653       CmiAssert(CkMyPe() == _diePE);
1654       count ++;
1655       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1656         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1657         CkRestartCheckPointCallback(NULL, NULL);
1658         count = 0;
1659       }
1660 #endif
1661 #endif
1662     }
1663
1664     extern void _discard_charm_message();
1665     extern void _resume_charm_message();
1666
1667     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1668       return data;
1669     }
1670
1671     static void restartBcastHandler(char *msg)
1672     {
1673 #if CMK_MEM_CHECKPOINT
1674       // advance phase counter
1675       CkMemCheckPT::inRestarting = 1;
1676       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1677       // gzheng
1678       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1679
1680       if (CkMyPe()==_diePE)
1681         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1682
1683       // reset QD counters
1684       /*  gzheng
1685           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1686        */
1687
1688       /*  gzheng
1689           if (CkMyPe()==_diePE)
1690           CkRestartCheckPointCallback(NULL, NULL);
1691        */
1692       CmiFree(msg);
1693
1694       _resume_charm_message();
1695
1696       // reduction
1697       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1698       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1699 #if CMK_USE_BARRIER
1700       //CmiPrintf("before reduce\n");   
1701       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1702       //CmiPrintf("after reduce\n");    
1703 #else
1704       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1705 #endif 
1706       checkpointed = 0;
1707 #endif
1708     }
1709
1710     extern void _initDone();
1711
1712     bool compare(char * buf1, char *buf2){
1713       if(CkMyPe()==0)
1714         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1715       PUP::checker pchecker(buf1,buf2);
1716       pchecker.skip();
1717       int numElements;
1718       pchecker|numElements;
1719       for(int i=0;i<numElements;i++){
1720         CkGroupID gID;
1721         CkArrayIndex idx;
1722
1723         pchecker|gID;
1724         pchecker|idx;
1725
1726         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1727         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1728       }
1729       if(CkMyPe()==0)
1730         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1731       //int fault_num = pchecker.getFaultNum();
1732       bool result = pchecker.getResult();
1733       /*if(!result){
1734         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1735       }*/
1736       return result;
1737     }
1738     int getChecksum(char * buf){
1739       PUP::checker pchecker(buf);
1740       pchecker.skip();
1741       int numElements;
1742       pchecker|numElements;
1743 //      CkPrintf("num %d\n",numElements);
1744       for(int i=0;i<numElements;i++){
1745         CkGroupID gID;
1746         CkArrayIndex idx;
1747
1748         pchecker|gID;
1749         pchecker|idx;
1750
1751         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1752         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1753       }
1754       return pchecker.getChecksum();
1755     }
1756
1757     static void recvRemoteChkpHandler(char *msg){
1758       CpvAccess(remoteChkpDone) = 1;
1759       if(CpvAccess(use_checksum)){
1760         if(CkMyPe()==0)
1761           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1762         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1763         CpvAccess(recvdRemote) = 1;
1764         if(CpvAccess(recvdLocal)==1){
1765           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1766             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1767           }
1768           else{
1769             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1770           }
1771         }
1772       }else{
1773         envelope *env = (envelope *)msg;
1774         CkUnpackMessage(&env);
1775         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1776         if(CpvAccess(recvdLocal)==1){
1777           int pointer = CpvAccess(curPointer);
1778           int size = CpvAccess(chkpBuf)[pointer]->len;
1779           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1780             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1781           }else
1782           {
1783             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1784           }
1785           delete chkpMsg;
1786           if(CkMyPe()==0)
1787             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1788         }else{
1789           CpvAccess(recvdRemote) = 1;
1790           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1791           CpvAccess(buddyBuf) = chkpMsg;
1792         }
1793       }
1794     }
1795
1796     static void replicaRecoverHandler(char *msg){
1797       //fflush(stdout);
1798       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1799       CpvAccess(remoteChkpDone) = 1;
1800       if(CpvAccess(localChkpDone) == 1)
1801         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1802       CmiFree(msg);
1803     }
1804
1805     static void replicaChkpDoneHandler(char *msg){
1806       CpvAccess(remoteChkpDone) = 1;
1807       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1808       if(CpvAccess(localChkpDone) == 1)
1809         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1810       CmiFree(msg);
1811     }
1812
1813     static void replicaDieHandler(char * msg){
1814 #if CMK_CONVERSE_MPI
1815       //broadcast to every one in my replica
1816       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1817       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1818 #endif
1819     }
1820
1821     static void replicaChkpStartHandler(char * msg){
1822       CpvAccess(remoteStarted) =1;
1823       if(CpvAccess(localStarted)==1){    
1824         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1825         checkptMgr.doItNow(0);
1826       }
1827     }
1828
1829
1830     static void replicaDieBcastHandler(char *msg){
1831       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1832       CpvAccess(_remoteCrashedNode) = diePe;
1833       if(CkMyPe()==diePe){
1834         CmiPrintf("pe %d in replicad word die\n",diePe);
1835         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1836         fflush(stdout);
1837       }
1838       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1839         find_spare_mpirank(diePe,CmiMyPartition()^1);
1840       }
1841       //broadcast to my partition to get local max iter
1842       if(CpvAccess(resilience)!=1){
1843         CkMemCheckPT::replicaAlive = 0;
1844         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1845       }
1846       CmiFree(msg);
1847     }
1848
1849     static void recoverRemoteProcDataHandler(char *msg){
1850       envelope *env = (envelope *)msg;
1851       CkUnpackMessage(&env);
1852       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1853
1854       //store the checkpoint
1855       int pointer = procMsg->pointer;
1856
1857
1858       if(CkMyPe()==CpvAccess(_crashedNode)){
1859         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1860         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1861         PUP::fromMem p(procMsg->packData);
1862         _handleProcData(p,CmiTrue);
1863         _initDone();
1864       }
1865       else{
1866         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1867         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1868         //_handleProcData(p,CmiFalse);
1869       }
1870       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1871       CKLOCMGR_LOOP(mgr->startInserting(););
1872
1873       CpvAccess(recvdProcChkp) =1;
1874       if(CpvAccess(recvdArrayChkp)==1){
1875         _resume_charm_message();
1876         _diePE = CpvAccess(_crashedNode);
1877         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1878         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1879         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1880 #if CMK_USE_BARRIER
1881       //CmiPrintf("before reduce\n");   
1882         if(CpvAccess(resilience)==1){
1883           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1884         }else
1885           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1886       //CmiPrintf("after reduce\n");    
1887 #else
1888         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1889 #endif 
1890       }
1891     }
1892
1893     static void recoverRemoteArrayDataHandler(char *msg){
1894       envelope *env = (envelope *)msg;
1895       CkUnpackMessage(&env);
1896       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1897
1898       //store the checkpoint
1899       int pointer = chkpMsg->pointer;
1900       CpvAccess(curPointer) = pointer;
1901       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1902       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1903       CpvAccess(recvdArrayChkp) =1;
1904       CkMemCheckPT::inRestarting = 1;
1905       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1906         _resume_charm_message();
1907         _diePE = CpvAccess(_crashedNode);
1908         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1909         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1910         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1911         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1912 #if CMK_USE_BARRIER
1913       //CmiPrintf("before reduce\n");   
1914         if(CpvAccess(resilience)==1){
1915           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1916         }else
1917           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1918       //CmiPrintf("after reduce\n");    
1919 #else
1920         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1921 #endif 
1922       }
1923     }
1924
1925     static void recvPhaseHandler(char * msg)
1926     {
1927       CpvAccess(_curRestartPhase)--;
1928       CkMemCheckPT::inRestarting = 1;
1929       CmiFree(msg);
1930       //notify the buddy in the replica now i can receive the checkpoint msg
1931       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1932         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1933         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1934         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1935         //timer start
1936         if(CpvAccess(_crashedNode)==CkMyPe())
1937           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1938         if(CpvAccess(_crashedNode)==CmiMyPe())
1939           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1940       }else{
1941         CpvAccess(curPointer)^=1;
1942         CkMemCheckPT::inRestarting = 1;
1943         _resume_charm_message();
1944         _diePE = CpvAccess(_crashedNode);
1945       }
1946     }
1947     
1948     static void askRecoverDataHandler(char * msg){
1949       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1950         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1951       if(CpvAccess(resilience)!=1){
1952         CpvAccess(remoteReady)=1;
1953         if(CpvAccess(localReady)==1){
1954           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1955           {     
1956             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1957             CkPackMessage(&env);
1958             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1959             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1960             CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
1961           }
1962           //send the array checkpoint data
1963           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1964           CkPackMessage(&env);
1965           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1966           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1967           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1968             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1969         }
1970       }else{
1971         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
1972         int pointer = CpvAccess(curPointer)^1;
1973         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1974         procMsg->pointer = pointer;
1975         envelope * env = (envelope *)(UsrToEnv(procMsg));
1976         CkPackMessage(&env);
1977         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1978         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1979         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
1980         
1981         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1982         arrayMsg->pointer = pointer;
1983         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1984         CkPackMessage(&env1);
1985         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1986         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1987         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1988       }
1989     }
1990     // called on crashed processor
1991     static void recoverProcDataHandler(char *msg)
1992     {
1993 #if CMK_MEM_CHECKPOINT
1994       int i;
1995       envelope *env = (envelope *)msg;
1996       CkUnpackMessage(&env);
1997       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1998       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1999       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2000       PUP::fromMem p(procMsg->packData);
2001       _handleProcData(p,CmiTrue);
2002
2003       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2004       // gzheng
2005       CKLOCMGR_LOOP(mgr->startInserting(););
2006
2007       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2008       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2009       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2010       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2011
2012       _initDone();
2013       //   CpvAccess(_qd)->flushStates();
2014       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2015 #endif
2016     }
2017     //for replica, got the phase number from my neighbor processor in the current partition
2018     static void askPhaseHandler(char *msg)
2019     {
2020 #if CMK_MEM_CHECKPOINT
2021       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2022       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2023       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2024       CmiSetHandler(msg, recvPhaseHandlerIdx);
2025       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2026 #endif
2027     }
2028     // called on its backup processor
2029     // get backup message buffer and sent to crashed processor
2030     static void askProcDataHandler(char *msg)
2031     {
2032 #if CMK_MEM_CHECKPOINT
2033       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2034       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2035       if (CpvAccess(procChkptBuf) == NULL)  {
2036         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2037         CkAbort("no checkpoint found");
2038       }
2039       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2040       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2041
2042       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2043
2044       CkPackMessage(&env);
2045       CmiSetHandler(env, recoverProcDataHandlerIdx);
2046       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2047       CpvAccess(procChkptBuf) = NULL;
2048       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2049 #endif
2050     }
2051
2052     // called on PE 0
2053     void qd_callback(void *m)
2054     {
2055       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2056       fflush(stdout);
2057       CkFreeMsg(m);
2058       if(CmiNumPartition()==1){
2059 #ifdef CMK_SMP
2060         for(int i=0;i<CmiMyNodeSize();i++){
2061           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2062           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2063           CmiSetHandler(msg, askProcDataHandlerIdx);
2064           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2065           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2066         }
2067         return;
2068 #endif
2069         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2070         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2071         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2072         CmiSetHandler(msg, askProcDataHandlerIdx);
2073         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2074         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2075       }
2076       else{
2077         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2078         CmiSetHandler(msg, recvPhaseHandlerIdx);
2079         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2080       }
2081     }
2082
2083     // on crashed node
2084     void CkMemRestart(const char *dummy, CkArgMsg *args)
2085     {
2086 #if CMK_MEM_CHECKPOINT
2087       _diePE = CmiMyNode();
2088       CpvAccess( _crashedNode )= CmiMyNode();
2089       CkMemCheckPT::inRestarting = 1;
2090       _discard_charm_message();
2091
2092       /*if(CmiMyRank()==0){
2093         CkCallback cb(qd_callback);
2094         CkStartQD(cb);
2095         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2096         }*/
2097       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2098       if(CmiNumPartition()==1){
2099         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2100         restartT = CmiWallTimer();
2101         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2102         fflush(stdout);
2103         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2104         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2105         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2106         CmiSetHandler(msg, askProcDataHandlerIdx);
2107         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2108         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2109       }
2110       else{
2111         CkCallback cb(qd_callback);
2112         CkStartQD(cb);
2113       }
2114 #else
2115       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2116 #endif
2117     }
2118
2119     // can be called in other files
2120     // return true if it is in restarting
2121     extern "C"
2122       int CkInRestarting()
2123       {
2124 #if CMK_MEM_CHECKPOINT
2125         if (CpvAccess( _crashedNode)!=-1) return 1;
2126         // gzheng
2127         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2128         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2129         return CkMemCheckPT::inRestarting;
2130 #else
2131         return 0;
2132 #endif
2133       }
2134
2135     extern "C"
2136       int CkReplicaAlive()
2137       {
2138         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2139         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2140         return CkMemCheckPT::replicaAlive;
2141
2142         /*if(CkMemCheckPT::replicaDead==1)
2143           return 0;
2144           else          
2145           return 1;*/
2146       }
2147
2148     extern "C"
2149       int CkInCheckpointing()
2150       {
2151         return CkMemCheckPT::inCheckpointing;
2152       }
2153
2154     extern "C"
2155       void CkSetInLdb(){
2156 #if CMK_MEM_CHECKPOINT
2157         CkMemCheckPT::inLoadbalancing = 1;
2158 #endif
2159       }
2160
2161     extern "C"
2162       int CkInLdb(){
2163 #if CMK_MEM_CHECKPOINT
2164         return CkMemCheckPT::inLoadbalancing;
2165 #endif
2166         return 0;
2167       }
2168
2169     extern "C"
2170       void CkResetInLdb(){
2171 #if CMK_MEM_CHECKPOINT
2172         CkMemCheckPT::inLoadbalancing = 0;
2173 #endif
2174       }
2175
2176     /*****************************************************************************
2177       module initialization
2178      *****************************************************************************/
2179
2180     static int arg_where = CkCheckPoint_inMEM;
2181
2182 #if CMK_MEM_CHECKPOINT
2183     void init_memcheckpt(char **argv)
2184     {
2185       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2186         arg_where = CkCheckPoint_inDISK;
2187       }
2188       CpvInitialize(int, use_checksum);
2189       CpvInitialize(int, resilience);
2190       CpvAccess(use_checksum)=0;
2191       CpvAccess(resilience)=0;
2192       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2193         CpvAccess(use_checksum)=1;
2194       }
2195       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2196         CpvAccess(resilience)=1;
2197       }
2198       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2199         CpvAccess(resilience)=2;
2200       }
2201       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2202         CpvAccess(resilience)=3;
2203       }
2204       // initiliazing _crashedNode variable
2205       CpvInitialize(int, _crashedNode);
2206       CpvInitialize(int, _remoteCrashedNode);
2207       CpvAccess(_crashedNode) = -1;
2208       CpvAccess(_remoteCrashedNode) = -1;
2209       init_FI(argv);
2210     }
2211 #endif
2212
2213     class CkMemCheckPTInit: public Chare {
2214       public:
2215         CkMemCheckPTInit(CkArgMsg *m) {
2216 #if CMK_MEM_CHECKPOINT
2217           if (arg_where == CkCheckPoint_inDISK) {
2218             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2219           }
2220           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2221           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2222 #endif
2223         }
2224     };
2225
2226     static void notifyHandler(char *msg)
2227     {
2228 #if CMK_MEM_CHECKPOINT
2229       CmiFree(msg);
2230       /* immediately increase restart phase to filter old messages */
2231       CpvAccess(_curRestartPhase) ++;
2232       CpvAccess(_qd)->flushStates();
2233       _discard_charm_message();
2234
2235 #endif
2236     }
2237
2238     extern "C"
2239       void notify_crash(int node)
2240       {
2241 #ifdef CMK_MEM_CHECKPOINT
2242         CpvAccess( _crashedNode) = node;
2243 #ifdef CMK_SMP
2244         for(int i=0;i<CkMyNodeSize();i++){
2245           CpvAccessOther(_crashedNode,i)=node;
2246         }
2247 #endif
2248         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2249         CkMemCheckPT::inRestarting = 1;
2250
2251         // this may be in interrupt handler, send a message to reset QD
2252         int pe = CmiNodeFirst(CkMyNode());
2253         for(int i=0;i<CkMyNodeSize();i++){
2254           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2255           CmiSetHandler(msg, notifyHandlerIdx);
2256           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2257         }
2258 #endif
2259       }
2260
2261     extern "C" void (*notify_crash_fn)(int node);
2262
2263
2264 #if CMK_CONVERSE_MPI
2265     void buddyDieHandler(char *msg)
2266     {
2267 #if CMK_MEM_CHECKPOINT
2268       // notify
2269       CkMemCheckPT::inRestarting = 1;
2270       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2271       notify_crash(diepe);
2272       // send message to crash pe to let it restart
2273       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2274       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2275       int buddy = obj->BuddyPE(CmiMyPe());
2276       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2277       if (buddy == diepe)  {
2278         mpi_restart_crashed(diepe, newrank);
2279       }
2280 #endif
2281     }
2282
2283     void pingHandler(void *msg)
2284     {
2285       lastPingTime = CmiWallTimer();
2286       CmiFree(msg);
2287     }
2288
2289     void pingCheckHandler()
2290     {
2291 #if CMK_MEM_CHECKPOINT
2292       double now = CmiWallTimer();
2293       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2294         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2295         int i, pe, buddy;
2296         // tell everyone the buddy dies
2297         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2298         for (i = 1; i < CmiNumPes(); i++) {
2299           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2300           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2301         }
2302         buddy = pe;
2303         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2304         fflush(stdout);
2305         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2306         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2307         CmiSetHandler(msg, buddyDieHandlerIdx);
2308         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2309         //send to everyone in the other world
2310         if(CmiNumPartition()!=1){
2311           //for(int i=0;i<CmiNumPes();i++){
2312             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2313             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2314             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2315             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2316           //}
2317         }
2318       }
2319         else 
2320           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2321 #endif
2322       }
2323
2324       void pingBuddy()
2325       {
2326 #if CMK_MEM_CHECKPOINT
2327         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2328         if (obj) {
2329           int buddy = obj->BuddyPE(CkMyPe());
2330           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2331           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2332           CmiSetHandler(msg, pingHandlerIdx);
2333           CmiGetRestartPhase(msg) = 9999;
2334           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2335         }
2336         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2337 #endif
2338       }
2339 #endif
2340
2341       // initproc
2342       void CkRegisterRestartHandler( )
2343       {
2344 #if CMK_MEM_CHECKPOINT
2345         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2346         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2347         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2348         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2349         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2350         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2351
2352         //for replica
2353         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2354         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2355         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2356         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2357         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2358         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2359         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2360         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2361         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2362         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2363
2364 #if CMK_CONVERSE_MPI
2365         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2366         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2367         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2368 #endif
2369
2370         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2371         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2372         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2373         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2374         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2375         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2376         CpvInitialize(int,curPointer);
2377         CpvInitialize(int,recvdLocal);
2378         CpvInitialize(int,localChkpDone);
2379         CpvInitialize(int,remoteChkpDone);
2380         CpvInitialize(int,remoteStarted);
2381         CpvInitialize(int,localStarted);
2382         CpvInitialize(int,localReady);
2383         CpvInitialize(int,remoteReady);
2384         CpvInitialize(int,recvdRemote);
2385         CpvInitialize(int,recvdProcChkp);
2386         CpvInitialize(int,localChecksum);
2387         CpvInitialize(int,remoteChecksum);
2388         CpvInitialize(int,recvdArrayChkp);
2389
2390         CpvAccess(procChkptBuf) = NULL;
2391         CpvAccess(buddyBuf) = NULL;
2392         CpvAccess(recoverProcBuf) = NULL;
2393         CpvAccess(recoverArrayBuf) = NULL;
2394         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2395         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2396         CpvAccess(chkpBuf)[0] = NULL;
2397         CpvAccess(chkpBuf)[1] = NULL;
2398         CpvAccess(localProcChkpBuf)[0] = NULL;
2399         CpvAccess(localProcChkpBuf)[1] = NULL;
2400
2401         CpvAccess(curPointer) = 0;
2402         CpvAccess(recvdLocal) = 0;
2403         CpvAccess(localChkpDone) = 0;
2404         CpvAccess(remoteChkpDone) = 0;
2405         CpvAccess(remoteStarted) = 0;
2406         CpvAccess(localStarted) = 0;
2407         CpvAccess(localReady) = 0;
2408         CpvAccess(remoteReady) = 0;
2409         CpvAccess(recvdRemote) = 0;
2410         CpvAccess(recvdProcChkp) = 0;
2411         CpvAccess(localChecksum) = 0;
2412         CpvAccess(remoteChecksum) = 0;
2413         CpvAccess(recvdArrayChkp) = 0;
2414
2415         notify_crash_fn = notify_crash;
2416
2417 #if ! CMK_CONVERSE_MPI
2418         // print pid to kill
2419         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2420         //  sleep(4);
2421 #endif
2422 #endif
2423       }
2424
2425
2426       extern "C"
2427         int CkHasCheckpoints()
2428         {
2429           return checkpointed;
2430         }
2431
2432       /// @todo: the following definitions should be moved to a separate file containing
2433       // structures and functions about fault tolerance strategies
2434
2435       /**
2436        *  * @brief: function for killing a process                                             
2437        *   */
2438 #ifdef CMK_MEM_CHECKPOINT
2439 #if CMK_HAS_GETPID
2440       void killLocal(void *_dummy,double curWallTime){
2441         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2442         if(CmiWallTimer()<killTime-1){
2443           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2444         }else{ 
2445 #if CMK_CONVERSE_MPI
2446           CkDieNow();
2447 #else 
2448           kill(getpid(),SIGKILL);                                               
2449 #endif
2450         }              
2451       } 
2452 #else
2453       void killLocal(void *_dummy,double curWallTime){
2454         CmiAbort("kill() not supported!");
2455       }
2456 #endif
2457 #endif
2458
2459 #ifdef CMK_MEM_CHECKPOINT
2460       /**
2461        * @brief: reads the file with the kill information
2462        */
2463       void readKillFile(){
2464         FILE *fp=fopen(killFile,"r");
2465         if(!fp){
2466           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2467           return;
2468         }
2469         int proc;
2470         double sec;
2471         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2472           if(proc == CkMyNode() && CkMyRank() == 0){
2473             killTime = CmiWallTimer()+sec;
2474             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2475             CcdCallFnAfter(killLocal,NULL,sec*1000);
2476           }
2477         }
2478         fclose(fp);
2479       }
2480
2481 #if ! CMK_CONVERSE_MPI
2482       void CkDieNow()
2483       {
2484 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2485         // ignored for non-mpi version
2486         CmiPrintf("[%d] die now.\n", CmiMyPe());
2487         killTime = CmiWallTimer()+0.001;
2488         CcdCallFnAfter(killLocal,NULL,1);
2489 #endif
2490       }
2491 #endif
2492
2493 #endif
2494
2495 #include "CkMemCheckpoint.def.h"
2496
2497