minor
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117  }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, recvdArrayChkp);
131 CpvDeclare(int, recvdProcChkp);
132
133 bool compare(char * buf1, char * buf2);
134 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
135 // Converse function handles
136 static int askPhaseHandlerIdx;
137 static int recvPhaseHandlerIdx;
138 static int askProcDataHandlerIdx;
139 static int restartBcastHandlerIdx;
140 static int recoverProcDataHandlerIdx;
141 static int restartBeginHandlerIdx;
142 static int recvRemoteChkpHandlerIdx;
143 static int replicaDieHandlerIdx;
144 static int replicaDieBcastHandlerIdx;
145 static int replicaRecoverHandlerIdx;
146 static int recoverRemoteProcDataHandlerIdx;
147 static int recoverRemoteArrayDataHandlerIdx;
148 static int notifyHandlerIdx;
149 // compute the backup processor
150 // FIXME: avoid crashed processors
151 #if CMK_CONVERSE_MPI
152 static int pingHandlerIdx;
153 static int pingCheckHandlerIdx;
154 static int buddyDieHandlerIdx;
155 static double lastPingTime = -1;
156
157 extern "C" void mpi_restart_crashed(int pe, int rank);
158 extern "C" int  find_spare_mpirank(int pe,int partition);
159
160 void pingBuddy();
161 void pingCheckHandler();
162
163 #endif
164 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
165
166 inline int CkMemCheckPT::BuddyPE(int pe)
167 {
168   int budpe;
169 #if NODE_CHECKPOINT
170     // buddy is the processor with same rank on the next physical node
171   int r1 = CmiPhysicalRank(pe);
172   int budnode = CmiPhysicalNodeID(pe);
173   do {
174     budnode = (budnode+1)%CmiNumPhysicalNodes();
175     int *pelist;
176     int num;
177     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
178     budpe = pelist[r1 % num];
179   } while (isFailed(budpe));
180   if (budpe == pe) {
181     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
182     CmiAbort("Failed to find a buddy processor");
183   }
184 #else
185   budpe = pe;
186   while (budpe == pe || isFailed(budpe)) 
187           budpe = (budpe+1)%CkNumPes();
188 #endif
189   return budpe;
190 }
191
192 // called in array element constructor
193 // choose and register with 2 buddies for checkpoiting 
194 #if CMK_MEM_CHECKPOINT
195 void ArrayElement::init_checkpt() {
196         if (_memChkptOn == 0) return;
197         if (CkInRestarting()) {
198           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
199         }
200         // only master init checkpoint
201         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
202
203         budPEs[0] = CkMyPe();
204         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
205         CmiAssert(budPEs[0] != budPEs[1]);
206         // inform checkPTMgr
207         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
208         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
209         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
210         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
211 }
212 #endif
213
214 // entry function invoked by checkpoint mgr asking for checkpoint data
215 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
216 #if CMK_MEM_CHECKPOINT
217 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
218 //char index[128];   thisIndexMax.sprint(index);
219 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
220   CkLocMgr *locMgr = thisArray->getLocMgr();
221   CmiAssert(myRec!=NULL);
222   int size;
223   {
224         PUP::sizer p;
225         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
226         size = p.size();
227   }
228   int packSize = size/sizeof(double) +1;
229   CkArrayCheckPTMessage *msg =
230                  new (packSize, 0) CkArrayCheckPTMessage;
231   msg->len = size;
232   msg->index =thisIndexMax;
233   msg->aid = thisArrayID;
234   msg->locMgr = locMgr->getGroupID();
235   msg->cp_flag = 1;
236   {
237         PUP::toMem p(msg->packData);
238         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
239   }
240
241   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
242   checkptMgr.recvData(msg, 2, budPEs);
243   delete m;
244 #endif
245 }
246
247 // checkpoint holder class - for memory checkpointing
248 class CkMemCheckPTInfo: public CkCheckPTInfo
249 {
250   CkArrayCheckPTMessage *ckBuffer;
251 public:
252   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
253                     CkCheckPTInfo(a, loc, idx, pno)
254   {
255     ckBuffer = NULL;
256   }
257   ~CkMemCheckPTInfo() 
258   {
259     if (ckBuffer) delete ckBuffer; 
260   }
261   inline void updateBuffer(CkArrayCheckPTMessage *data) 
262   {
263     CmiAssert(data!=NULL);
264     if (ckBuffer) delete ckBuffer;
265     ckBuffer = data;
266   }    
267   inline CkArrayCheckPTMessage * getCopy()
268   {
269     if (ckBuffer == NULL) {
270       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
271       CmiAbort("Abort!");
272     }
273     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
274   }     
275   inline void updateBuddy(int b1, int b2) {
276      CmiAssert(ckBuffer);
277      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      CmiAssert(ckBuffer);
283      return ckBuffer->len; 
284   }
285 };
286
287 // checkpoint holder class - for in-disk checkpointing
288 class CkDiskCheckPTInfo: public CkCheckPTInfo 
289 {
290   char *fname;
291   int bud1, bud2;
292   int len;                      // checkpoint size
293 public:
294   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
295   {
296 #if CMK_USE_MKSTEMP
297     fname = new char[64];
298     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
299     mkstemp(fname);
300 #else
301     fname=tmpnam(NULL);
302 #endif
303     bud1 = bud2 = -1;
304     len = 0;
305   }
306   ~CkDiskCheckPTInfo() 
307   {
308     remove(fname);
309   }
310   inline void updateBuffer(CkArrayCheckPTMessage *data) 
311   {
312     double t = CmiWallTimer();
313     // unpack it
314     envelope *env = UsrToEnv(data);
315     CkUnpackMessage(&env);
316     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
317     FILE *f = fopen(fname,"wb");
318     PUP::toDisk p(f);
319     CkPupMessage(p, (void **)&data);
320     // delay sync to the end because otherwise the messages are blocked
321 //    fsync(fileno(f));
322     fclose(f);
323     bud1 = data->bud1;
324     bud2 = data->bud2;
325     len = data->len;
326     delete data;
327     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
328   }
329   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
330   {
331     CkArrayCheckPTMessage *data;
332     FILE *f = fopen(fname,"rb");
333     PUP::fromDisk p(f);
334     CkPupMessage(p, (void **)&data);
335     fclose(f);
336     data->bud1 = bud1;                          // update the buddies
337     data->bud2 = bud2;
338     return data;
339   }
340   inline void updateBuddy(int b1, int b2) {
341      bud1 = b1; bud2 = b2;
342      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
343      CmiAssert(pNo != CkMyPe());
344   }
345   inline int getSize() { 
346      return len; 
347   }
348 };
349
350 CkMemCheckPT::CkMemCheckPT(int w)
351 {
352   int numnodes = 0;
353 #if NODE_CHECKPOINT
354   numnodes = CmiNumPhysicalNodes();
355 #else
356   numnodes = CkNumPes();
357 #endif
358 #if CK_NO_PROC_POOL
359   if (numnodes <= 2)
360 #else
361   if (numnodes  == 1)
362 #endif
363   {
364     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
365     _memChkptOn = 0;
366   }
367   inRestarting = 0;
368   inCheckpointing = 0;
369   recvCount = peCount = 0;
370   ackCount = 0;
371   expectCount = -1;
372   where = w;
373   replicaAlive = 1;
374 #if CMK_CONVERSE_MPI
375   void pingBuddy();
376   void pingCheckHandler();
377   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
378   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
379 #endif
380   chkpTable[0] = NULL;
381   chkpTable[1] = NULL;
382   maxIter = -1;
383   recvIterCount = 0;
384   localDecided = false;
385 }
386
387 CkMemCheckPT::~CkMemCheckPT()
388 {
389   int len = ckTable.length();
390   for (int i=0; i<len; i++) {
391     delete ckTable[i];
392   }
393 }
394
395 void CkMemCheckPT::pup(PUP::er& p) 
396
397   CBase_CkMemCheckPT::pup(p); 
398   p|cpStarter;
399   p|thisFailedPe;
400   p|failedPes;
401   p|ckCheckPTGroupID;           // recover global variable
402   p|cpCallback;                 // store callback
403   p|where;                      // where to checkpoint
404   p|peCount;
405   if (p.isUnpacking()) {
406     recvCount = 0;
407 #if CMK_CONVERSE_MPI
408     void pingBuddy();
409     void pingCheckHandler();
410     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
411     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
412 #endif
413           maxIter = -1;
414           recvIterCount = 0;
415           localDecided = false;
416   }
417 }
418
419 void CkMemCheckPT::getIter(){
420         localDecided = true;
421         localMaxIter = maxIter+1;
422         contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
423         int elemCount = CkCountChkpSyncElements();
424         if(elemCount == 0){
425                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
426         }
427 }
428
429 //when one replica failes, decide the next chkp iter
430
431 void CkMemCheckPT::recvIter(int iter){
432         if(maxIter<iter){
433                 maxIter=iter;
434         }
435 }
436
437 void CkMemCheckPT::recvMaxIter(int iter){
438         localDecided = false;
439         CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
440 }
441
442 void CkMemCheckPT::reachChkpIter(){
443         recvIterCount++;
444         elemCount = CkCountChkpSyncElements();
445         if(recvIterCount == elemCount){
446                 recvIterCount = 0;
447                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448         }
449 }
450
451 void CkMemCheckPT::startChkp(){
452         CkPrintf("start checkpoint\n");
453         CkStartMemCheckpoint(cpCallback);
454 }
455
456 // called by checkpoint mgr to restore an array element
457 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
458 {
459 #if CMK_MEM_CHECKPOINT
460   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
461   // m->index.print();
462   PUP::fromMem p(m->packData);
463   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
464   CmiAssert(mgr);
465 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
466   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
467 #else
468   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
469 #endif
470
471   // find a list of array elements bound together
472   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
473   CmiAssert(elt);
474   CkLocRec_local *rec = elt->myRec;
475   CkVec<CkMigratable *> list;
476   mgr->migratableList(rec, list);
477   CmiAssert(list.length() > 0);
478   for (int l=0; l<list.length(); l++) {
479     elt = (ArrayElement *)list[l];
480     elt->budPEs[0] = m->bud1;
481     elt->budPEs[1] = m->bud2;
482     //    reset, may not needed now
483     // for now.
484     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
485       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
486       if (c) c->redNo = 0;
487     }
488   }
489 #endif
490 }
491
492 // return 1 if pe is a crashed processor
493 int CkMemCheckPT::isFailed(int pe)
494 {
495   for (int i=0; i<failedPes.length(); i++)
496     if (failedPes[i] == pe) return 1;
497   return 0;
498 }
499
500 // add pe into history list of all failed processors
501 void CkMemCheckPT::failed(int pe)
502 {
503   if (isFailed(pe)) return;
504   failedPes.push_back(pe);
505 }
506
507 int CkMemCheckPT::totalFailed()
508 {
509   return failedPes.length();
510 }
511
512 // create an checkpoint entry for array element of aid with index.
513 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
514 {
515   // error check, no duplicate
516   int idx, len = ckTable.size();
517   for (idx=0; idx<len; idx++) {
518     CkCheckPTInfo *entry = ckTable[idx];
519     if (index == entry->index) {
520       if (loc == entry->locMgr) {
521           // bindTo array elements
522           return;
523       }
524         // for array inheritance, the following check may fail
525         // because ArrayElement constructor of all superclasses are called
526       if (aid == entry->aid) {
527         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
528         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
529       }
530     }
531   }
532   CkCheckPTInfo *newEntry;
533   if (where == CkCheckPoint_inMEM)
534     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
535   else
536     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
537   ckTable.push_back(newEntry);
538   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
539 }
540
541 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
542 {
543 #if !CMK_CHKP_ALL       
544   int buddy = msg->bud1;
545   if (buddy == CkMyPe()) buddy = msg->bud2;
546   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
547   recvData(msg);
548     // ack
549   thisProxy[buddy].gotData();
550 #else
551   chkpTable[0] = NULL;
552   chkpTable[1] = NULL;
553   recvArrayCheckpoint(msg);
554   thisProxy[msg->bud2].gotData();
555 #endif
556 }
557
558 // loop through my checkpoint table and ask checkpointed array elements
559 // to send me checkpoint data.
560 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
561 {
562   checkpointed = 1;
563   cpCallback = cb;
564   cpStarter = starter;
565   inCheckpointing = 1;
566   if (CkMyPe() == cpStarter) {
567     startTime = CmiWallTimer();
568     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
569   }
570 #if CMK_CONVERSE_MPI
571   if(CmiNumPartition()==1)
572 #endif    
573   {
574
575 #if !CMK_CHKP_ALL
576   int len = ckTable.length();
577   for (int i=0; i<len; i++) {
578     CkCheckPTInfo *entry = ckTable[i];
579       // always let the bigger number processor send request
580     //if (CkMyPe() < entry->pNo) continue;
581       // always let the smaller number processor send request, may on same proc
582     if (!isMaster(entry->pNo)) continue;
583       // call inmem_checkpoint to the array element, ask it to send
584       // back checkpoint data via recvData().
585     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
586     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
587   }
588     // if my table is empty, then I am done
589   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
590 #else
591   startArrayCheckpoint();
592 #endif
593         sendProcData();
594   }
595 #if CMK_CONVERSE_MPI
596   else
597   {
598         startCheckpoint();
599   }
600 #endif
601   // pack and send proc level data
602 }
603
604 class MemElementPacker : public CkLocIterator{
605         private:
606                 CkLocMgr *locMgr;
607                 PUP::er &p;
608         public:
609                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
610                 void addLocation(CkLocation &loc){
611                         CkArrayIndexMax idx = loc.getIndex();
612                         CkGroupID gID = locMgr->ckGetGroupID();
613                         p|gID;
614                         p|idx;
615                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
616                 }
617 };
618
619 void pupAllElements(PUP::er &p){
620 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
621         int numElements;
622         if(!p.isUnpacking()){
623                 numElements = CkCountArrayElements();
624         }
625         p | numElements;
626         if(!p.isUnpacking()){
627                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
628         }
629 #endif
630 }
631
632 void CkMemCheckPT::startArrayCheckpoint(){
633 #if CMK_CHKP_ALL
634         int size;
635         {
636                 PUP::sizer psizer;
637                 pupAllElements(psizer);
638                 size = psizer.size();
639         }
640         int packSize = size/sizeof(double)+1;
641  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
642         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
643         msg->len = size;
644         msg->cp_flag = 1;
645         int budPEs[2];
646         msg->bud1=CkMyPe();
647         msg->bud2=ChkptOnPe(CkMyPe());
648         {
649                 PUP::toMem p(msg->packData);
650                 pupAllElements(p);
651         }
652         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
653         if(chkpTable[0]) delete chkpTable[0];
654         chkpTable[0] = msg;
655         //send the checkpoint to my 
656         recvCount++;
657 #endif
658 }
659
660 void CkMemCheckPT::startCheckpoint(){
661 #if CMK_CONVERSE_MPI
662         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
663     CkPrintf("in start checkpointing!!!!\n");
664   int size;
665   {
666     PUP::sizer p;
667     _handleProcData(p,CmiFalse);
668     size = p.size();
669   }
670         CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
671         procMsg->len = size;
672         procMsg->cp_flag = 1;
673         {
674                 PUP::toMem p(procMsg->packData);
675                 _handleProcData(p,CmiFalse);
676         }
677         int pointer = CpvAccess(curPointer);
678         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
679                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
680         
681         {
682                 PUP::sizer psizer;
683                 pupAllElements(psizer);
684                 size = psizer.size();
685         }
686         CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
687         msg->len = size;
688         msg->cp_flag = 1;
689         {
690                 PUP::toMem p(msg->packData);
691                 pupAllElements(p);
692         }
693         pointer = CpvAccess(curPointer);
694         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
695                 CpvAccess(chkpBuf)[pointer] = msg;
696         CkPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
697         if(CkReplicaAlive()==1){
698                 CpvAccess(recvdLocal) = 1;
699                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
700                 CkPackMessage(&env);
701                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
702                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
703         }
704         if(CpvAccess(recvdRemote)==1){
705                 //compare the checkpoint 
706           int size = CpvAccess(chkpBuf)[pointer]->len;
707           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
708                 thisProxy[CkMyPe()].doneComparison(true);
709           }else{
710                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
711                 thisProxy[CkMyPe()].doneComparison(false);
712           }
713                 CkPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
714           delete CpvAccess(buddyBuf);
715   }
716         else{
717                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
718                         {       
719                                 int pointer = CpvAccess(curPointer);
720                                 //send the proc data
721                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
722                                 procMsg->pointer = pointer;
723                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
724                                 CkPackMessage(&env);
725                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
726                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
727                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
728                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
729                                 }
730                         }
731                         //send the array checkpoint data
732                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
733                         msg->pointer = CpvAccess(curPointer);
734                         envelope * env = (envelope *)(UsrToEnv(msg));
735                         CkPackMessage(&env);
736                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
737                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
738                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
739                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
740                         //can continue work, no need to wait for my replica
741                 }
742         }
743 #endif
744 }
745
746 void CkMemCheckPT::doneComparison(bool ret){
747         int _ret = 1;
748         if(!ret){
749         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
750                 _ret = 0;
751         }else{
752                 _ret = 1;
753         }
754         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
755         contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
756 }
757
758 void CkMemCheckPT::doneRComparison(int ret){
759         CpvAccess(recvdRemote) = 0;
760         CpvAccess(recvdLocal) = 0;
761 //      if(CpvAccess(curPointer) == 0){
762         if(ret==CkNumPes()){
763         CpvAccess(curPointer)^=1;
764                 inCheckpointing = 0;
765                 if(CkMyPe() == 0){
766                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
767                 }
768                 CKLOCMGR_LOOP(mgr->resumeFromChkp(););
769         }
770         else{
771         CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
772                 RollBack();
773         }
774 }
775
776 void CkMemCheckPT::RollBack(){
777         //restore group data
778         checkpointed = 0;
779         CkMemCheckPT::inRestarting = 1;
780         int pointer = CpvAccess(curPointer)^1;//use the previous one
781     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
782         PUP::fromMem p(chkpMsg->packData);      
783         
784         //destroy array elements
785         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
786           int numGroups = CkpvAccess(_groupIDTable)->size();
787           for(int i=0;i<numGroups;i++) {
788                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
789                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
790                 obj->flushStates();
791                 obj->ckJustMigrated();
792           }
793         //restore array elements
794         
795         int numElements;
796         p|numElements;
797         
798         if(p.isUnpacking()){
799                 for(int i=0;i<numElements;i++){
800                 //for(int i=0;i<1;i++){
801                         CkGroupID gID;
802                         CkArrayIndex idx;
803                         p|gID;
804                         p|idx;
805                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
806                         CmiAssert(mgr);
807                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
808                 }
809         }
810         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
811         contribute(cb);
812 }
813
814 void CkMemCheckPT::notifyReplicaDie(int pe){
815         //CkPrintf("[%d] receive replica die\n",CkMyPe());
816         replicaAlive = 0;
817         CpvAccess(_remoteCrashedNode) = pe;
818 }
819
820 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
821 {
822 #if CMK_CHKP_ALL
823         int idx = 1;
824         if(msg->bud1 == CkMyPe()){
825                 idx = 0;
826         }
827         int isChkpting = msg->cp_flag;
828         if(isChkpting == 1){
829                 if(chkpTable[idx]) delete chkpTable[idx];
830         }
831         chkpTable[idx] = msg;
832         if(isChkpting){
833                 recvCount++;
834                 if(recvCount == 2){
835                   if (where == CkCheckPoint_inMEM) {
836                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
837                   }
838                   else if (where == CkCheckPoint_inDISK) {
839                         // another barrier for finalize the writing using fsync
840                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
841                         contribute(0,NULL,CkReduction::sum_int,localcb);
842                   }
843                   else
844                         CmiAbort("Unknown checkpoint scheme");
845                   recvCount = 0;
846                 }
847         }
848 #endif
849 }
850
851 // don't handle array elements
852 static inline void _handleProcData(PUP::er &p, CmiBool create)
853 {
854     // save readonlys, and callback BTW
855     CkPupROData(p);
856
857     // save mainchares 
858     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
859         
860 #ifndef CMK_CHARE_USE_PTR
861     // save non-migratable chare
862     CkPupChareData(p);
863 #endif
864
865     // save groups into Groups.dat
866     CkPupGroupData(p,create);
867
868     // save nodegroups into NodeGroups.dat
869     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
870 }
871
872 void CkMemCheckPT::sendProcData()
873 {
874   // find out size of buffer
875   int size;
876   {
877     PUP::sizer p;
878     _handleProcData(p,CmiTrue);
879     size = p.size();
880   }
881   int packSize = size;
882   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
883   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
884   {
885     PUP::toMem p(msg->packData);
886     _handleProcData(p,CmiTrue);
887   }
888   msg->pe = CkMyPe();
889   msg->len = size;
890   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
891   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
892 }
893
894 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
895 {
896   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
897   CpvAccess(procChkptBuf) = msg;
898   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
899   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
900 }
901
902 // ArrayElement call this function to give us the checkpointed data
903 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
904 {
905   int len = ckTable.length();
906   int idx;
907   for (idx=0; idx<len; idx++) {
908     CkCheckPTInfo *entry = ckTable[idx];
909     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
910   }
911   CkAssert(idx < len);
912   int isChkpting = msg->cp_flag;
913   ckTable[idx]->updateBuffer(msg);
914   if (isChkpting) {
915       // all my array elements have returned their inmem data
916       // inform starter processor that I am done.
917     recvCount ++;
918     if (recvCount == ckTable.length()) {
919       if (where == CkCheckPoint_inMEM) {
920         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
921       }
922       else if (where == CkCheckPoint_inDISK) {
923         // another barrier for finalize the writing using fsync
924         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
925         contribute(0,NULL,CkReduction::sum_int,localcb);
926       }
927       else
928         CmiAbort("Unknown checkpoint scheme");
929       recvCount = 0;
930     } 
931   }
932 }
933
934 // only used in disk checkpointing
935 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
936 {
937   delete m;
938 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
939   system("sync");
940 #endif
941   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
942 }
943
944 // only is called on cpStarter when checkpoint is done
945 void CkMemCheckPT::cpFinish()
946 {
947   CmiAssert(CkMyPe() == cpStarter);
948   peCount++;
949     // now that all processors have finished, activate callback
950   if (peCount == 2) 
951 {
952     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
953     peCount = 0;
954     thisProxy.report();
955   }
956 }
957
958 // for debugging, report checkpoint info
959 void CkMemCheckPT::report()
960 {
961         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
962         inCheckpointing = 0;
963 #if !CMK_CHKP_ALL
964   int objsize = 0;
965   int len = ckTable.length();
966   for (int i=0; i<len; i++) {
967     CkCheckPTInfo *entry = ckTable[i];
968     CmiAssert(entry);
969     objsize += entry->getSize();
970   }
971   CmiAssert(CpvAccess(procChkptBuf));
972   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
973 #else
974   if(CkMyPe()==0)
975   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
976 #endif
977 }
978
979 /*****************************************************************************
980                         RESTART Procedure
981 *****************************************************************************/
982
983 // master processor of two buddies
984 inline int CkMemCheckPT::isMaster(int buddype)
985 {
986 #if 0
987   int mype = CkMyPe();
988 //CkPrintf("ismaster: %d %d\n", pe, mype);
989   if (CkNumPes() - totalFailed() == 2) {
990     return mype > buddype;
991   }
992   for (int i=1; i<CkNumPes(); i++) {
993     int me = (buddype+i)%CkNumPes();
994     if (isFailed(me)) continue;
995     if (me == mype) return 1;
996     else return 0;
997   }
998   return 0;
999 #else
1000     // smaller one
1001   int mype = CkMyPe();
1002 //CkPrintf("ismaster: %d %d\n", pe, mype);
1003   if (CkNumPes() - totalFailed() == 2) {
1004     return mype < buddype;
1005   }
1006 #if NODE_CHECKPOINT
1007   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1008   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1009 #else
1010   for (int i=1; i<CkNumPes(); i++) {
1011 #endif
1012     int me = (mype+i)%CkNumPes();
1013     if (isFailed(me)) continue;
1014     if (me == buddype) return 1;
1015     else return 0;
1016   }
1017   return 0;
1018 #endif
1019 }
1020
1021
1022
1023 #if 0
1024 // helper class to pup all elements that belong to same ckLocMgr
1025 class ElementDestoryer : public CkLocIterator {
1026 private:
1027         CkLocMgr *locMgr;
1028 public:
1029         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1030         void addLocation(CkLocation &loc) {
1031                 CkArrayIndex idx=loc.getIndex();
1032                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1033                 loc.destroy();
1034         }
1035 };
1036 #endif
1037
1038 // restore the bitmap vector for LB
1039 void CkMemCheckPT::resetLB(int diepe)
1040 {
1041 #if CMK_LBDB_ON
1042   int i;
1043   char *bitmap = new char[CkNumPes()];
1044   // set processor available bitmap
1045   get_avail_vector(bitmap);
1046
1047   for (i=0; i<failedPes.length(); i++)
1048     bitmap[failedPes[i]] = 0; 
1049   bitmap[diepe] = 0;
1050
1051 #if CK_NO_PROC_POOL
1052   set_avail_vector(bitmap);
1053 #endif
1054
1055   // if I am the crashed pe, rebuild my failedPEs array
1056   if (CkMyNode() == diepe)
1057     for (i=0; i<CkNumPes(); i++) 
1058       if (bitmap[i]==0) failed(i);
1059
1060   delete [] bitmap;
1061 #endif
1062 }
1063
1064 static double restartT;
1065
1066 // in case when failedPe dies, everybody go through its checkpoint table:
1067 // destory all array elements
1068 // recover lost buddies
1069 // reconstruct all array elements from check point data
1070 // called on all processors
1071 void CkMemCheckPT::restart(int diePe)
1072 {
1073 #if CMK_MEM_CHECKPOINT
1074   double curTime = CmiWallTimer();
1075   if (CkMyPe() == diePe){
1076            restartT = CmiWallTimer();
1077     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1078   }
1079   stage = (char*)"resetLB";
1080   startTime = curTime;
1081   if (CkMyPe() == diePe)
1082     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1083
1084 #if CK_NO_PROC_POOL
1085   failed(diePe);        // add into the list of failed pes
1086 #endif
1087   thisFailedPe = diePe;
1088
1089   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1090
1091   inRestarting = 1;
1092                                                                                 
1093   // disable load balancer's barrier
1094   //if (CkMyPe() != diePe) resetLB(diePe);
1095
1096   CKLOCMGR_LOOP(mgr->startInserting(););
1097
1098
1099   if(CmiNumPartition()==1){
1100         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1101   }else{
1102         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1103         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1104   }
1105 #endif
1106 }
1107
1108 // loally remove all array elements
1109 void CkMemCheckPT::removeArrayElements()
1110 {
1111 #if CMK_MEM_CHECKPOINT
1112   int len = ckTable.length();
1113   double curTime = CmiWallTimer();
1114   if (CkMyPe() == thisFailedPe) 
1115     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1116   stage = (char*)"removeArrayElements";
1117   startTime = curTime;
1118
1119 //  if (cpCallback.isInvalid()) 
1120 //        CkPrintf("invalid pe %d\n",CkMyPe());
1121 //        CkAbort("Didn't set restart callback\n");;
1122   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1123
1124   // get rid of all buffering and remote recs
1125   // including destorying all array elements
1126 #if CK_NO_PROC_POOL  
1127         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1128 #else
1129         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1130 #endif
1131   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1132 #endif
1133 }
1134
1135 // flush state in reduction manager
1136 void CkMemCheckPT::resetReductionMgr()
1137 {
1138   if (CkMyPe() == thisFailedPe) 
1139     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1140   int numGroups = CkpvAccess(_groupIDTable)->size();
1141   for(int i=0;i<numGroups;i++) {
1142     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1143     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1144     obj->flushStates();
1145     obj->ckJustMigrated();
1146   }
1147   // reset again
1148   //CpvAccess(_qd)->flushStates();
1149   if(CmiNumPartition()==1){
1150         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1151   }
1152   else
1153         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1154 }
1155
1156 // recover the lost buddies
1157 void CkMemCheckPT::recoverBuddies()
1158 {
1159   int idx;
1160   int len = ckTable.length();
1161   // ready to flush reduction manager
1162   // cannot be CkMemCheckPT::restart because destory will modify states
1163   double curTime = CmiWallTimer();
1164   if (CkMyPe() == thisFailedPe)
1165   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1166   stage = (char *)"recoverBuddies";
1167   if (CkMyPe() == thisFailedPe)
1168   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1169   startTime = curTime;
1170
1171   // recover buddies
1172   expectCount = 0;
1173 #if !CMK_CHKP_ALL
1174   for (idx=0; idx<len; idx++) {
1175     CkCheckPTInfo *entry = ckTable[idx];
1176     if (entry->pNo == thisFailedPe) {
1177 #if CK_NO_PROC_POOL
1178       // find a new buddy
1179       int budPe = BuddyPE(CkMyPe());
1180 #else
1181       int budPe = thisFailedPe;
1182 #endif
1183       CkArrayCheckPTMessage *msg = entry->getCopy();
1184       msg->bud1 = budPe;
1185       msg->bud2 = CkMyPe();
1186       msg->cp_flag = 0;            // not checkpointing
1187       thisProxy[budPe].recoverEntry(msg);
1188       expectCount ++;
1189     }
1190   }
1191 #else
1192   //send to failed pe
1193   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1194 #if CK_NO_PROC_POOL
1195       // find a new buddy
1196       int budPe = BuddyPE(CkMyPe());
1197 #else
1198       int budPe = thisFailedPe;
1199 #endif
1200       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1201       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1202           msg->cp_flag = 0;            // not checkpointing
1203       msg->bud1 = budPe;
1204       msg->bud2 = CkMyPe();
1205       thisProxy[budPe].recoverEntry(msg);
1206       expectCount ++;
1207   }
1208 #endif
1209
1210   if (expectCount == 0) {
1211           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1212   }
1213   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1214 }
1215
1216 void CkMemCheckPT::gotData()
1217 {
1218   ackCount ++;
1219   if (ackCount == expectCount) {
1220     ackCount = 0;
1221     expectCount = -1;
1222     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1223     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1224   }
1225 }
1226
1227 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1228 {
1229
1230   for (int i=0; i<n; i++) {
1231     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1232     mgr->updateLocation(idx[i], nowOnPe);
1233   }
1234         thisProxy[nowOnPe].gotReply();
1235 }
1236
1237 // restore array elements
1238 void CkMemCheckPT::recoverArrayElements()
1239 {
1240   double curTime = CmiWallTimer();
1241   int len = ckTable.length();
1242   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1243   stage = (char *)"recoverArrayElements";
1244   if (CkMyPe() == thisFailedPe)
1245   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1246   startTime = curTime;
1247  int flag = 0;
1248   // recover all array elements
1249   int count = 0;
1250
1251 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1252   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1253   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1254 #endif
1255
1256 #if !CMK_CHKP_ALL
1257   for (int idx=0; idx<len; idx++)
1258   {
1259     CkCheckPTInfo *entry = ckTable[idx];
1260 #if CK_NO_PROC_POOL
1261     // the bigger one will do 
1262 //    if (CkMyPe() < entry->pNo) continue;
1263     if (!isMaster(entry->pNo)) continue;
1264 #else
1265     // smaller one do it, which has the original object
1266     if (CkMyPe() == entry->pNo+1 || 
1267         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1268 #endif
1269 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1270
1271     entry->updateBuddy(CkMyPe(), entry->pNo);
1272     CkArrayCheckPTMessage *msg = entry->getCopy();
1273     // gzheng
1274     //thisProxy[CkMyPe()].inmem_restore(msg);
1275     inmem_restore(msg);
1276 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1277     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1278     int homePe = mgr->homePe(msg->index);
1279     if (homePe != CkMyPe()) {
1280       gmap[homePe].push_back(msg->locMgr);
1281       imap[homePe].push_back(msg->index);
1282     }
1283 #endif
1284     CkFreeMsg(msg);
1285     count ++;
1286   }
1287 #else
1288         char * packData;
1289         if(CmiNumPartition()==1){
1290                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1291                 packData = (char *)msg->packData;
1292         }
1293         else{
1294                 int pointer = CpvAccess(curPointer);
1295                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1296                 packData = msg->packData;
1297         }
1298 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1299         recoverAll(packData,gmap,imap);
1300 #else
1301         recoverAll(packData);
1302 #endif
1303 #endif
1304   curTime = CmiWallTimer();
1305   //if (CkMyPe() == thisFailedPe)
1306   if (CkMyPe() == 0)
1307         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1308 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1309   for (int i=0; i<CkNumPes(); i++) {
1310     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1311       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1312         flag++; 
1313         }
1314   }
1315   delete [] imap;
1316   delete [] gmap;
1317 #endif
1318   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1319
1320   CKLOCMGR_LOOP(mgr->doneInserting(););
1321
1322   // _crashedNode = -1;
1323   CpvAccess(_crashedNode) = -1;
1324   inRestarting = 0;
1325 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1326   if (CkMyPe() == 0)
1327     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1328 #else
1329 if(flag == 0)
1330 {
1331     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1332 }
1333 #endif
1334 }
1335
1336 void CkMemCheckPT::gotReply(){
1337     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1338 }
1339
1340 void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1341 #if CMK_CHKP_ALL
1342         PUP::fromMem p(packData);
1343         int numElements;
1344         p|numElements;
1345         if(p.isUnpacking()){
1346                 for(int i=0;i<numElements;i++){
1347                         CkGroupID gID;
1348                         CkArrayIndex idx;
1349                         p|gID;
1350                         p|idx;
1351                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1352                         int homePe = mgr->homePe(idx);
1353 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1354                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1355 #else
1356                         if(CmiNumPartition()==1)
1357                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1358                         else{
1359                                 if(CkMyPe()==thisFailedPe){
1360                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1361                                 }
1362                         else{
1363                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1364                                 }
1365                         }
1366 #endif
1367 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1368                         homePe = mgr->homePe(idx);
1369                         if (homePe != CkMyPe()) {
1370                           gmap[homePe].push_back(gID);
1371                           imap[homePe].push_back(idx);
1372                         }
1373 #endif
1374                 }
1375         }
1376 #endif
1377 }
1378
1379
1380 // on every processor
1381 // turn load balancer back on
1382 void CkMemCheckPT::finishUp()
1383 {
1384   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1385   //CKLOCMGR_LOOP(mgr->doneInserting(););
1386   
1387   if (CkMyPe() == thisFailedPe)
1388   {
1389        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1390        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1391   }
1392   CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1393         
1394 #if CMK_CONVERSE_MPI    
1395   if(CmiNumPartition()!=1){
1396         CpvAccess(recvdProcChkp) = 0;
1397         CpvAccess(recvdArrayChkp) = 0;
1398         CpvAccess(curPointer)^=1;
1399         //notify my replica, restart is done
1400    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1401    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1402    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1403   }
1404    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1405          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1406    }
1407 #endif
1408
1409 #if CK_NO_PROC_POOL
1410 #if NODE_CHECKPOINT
1411   int numnodes = CmiNumPhysicalNodes();
1412 #else
1413   int numnodes = CkNumPes();
1414 #endif
1415   if (numnodes-totalFailed() <=2) {
1416     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1417     _memChkptOn = 0;
1418   }
1419 #endif
1420 }
1421
1422 void CkMemCheckPT::recoverFromSoftFailure()
1423 {
1424         inRestarting = 0;
1425         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1426 }
1427 // called only on 0
1428 void CkMemCheckPT::quiescence(CkCallback &cb)
1429 {
1430   static int pe_count = 0;
1431   pe_count ++;
1432   CmiAssert(CkMyPe() == 0);
1433   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1434   if (pe_count == CkNumPes()) {
1435     pe_count = 0;
1436     cb.send();
1437   }
1438 }
1439
1440 // User callable function - to start a checkpoint
1441 // callback cb is used to pass control back
1442 void CkStartMemCheckpoint(CkCallback &cb)
1443 {
1444 #if CMK_MEM_CHECKPOINT
1445         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1446   if (_memChkptOn == 0) {
1447     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1448     cb.send();
1449     return;
1450   }
1451   if (CkInRestarting()) {
1452       // trying to checkpointing during restart
1453     cb.send();
1454     return;
1455   }
1456     // store user callback and user data
1457   CkMemCheckPT::cpCallback = cb;
1458
1459     // broadcast to start check pointing
1460   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1461   checkptMgr.doItNow(CkMyPe(), cb);
1462 #else
1463   // when mem checkpoint is disabled, invike cb immediately
1464   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1465   cb.send();
1466 #endif
1467 }
1468
1469 void CkRestartCheckPoint(int diePe)
1470 {
1471 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1472   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1473   // broadcast
1474   checkptMgr.restart(diePe);
1475 }
1476
1477 static int _diePE = -1;
1478
1479 // callback function used locally by ccs handler
1480 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1481 {
1482 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1483   CkRestartCheckPoint(_diePE);
1484 }
1485
1486
1487 // called on crashed PE
1488 static void restartBeginHandler(char *msg)
1489 {
1490   CmiFree(msg);
1491 #if CMK_MEM_CHECKPOINT
1492 #if CMK_USE_BARRIER
1493         if(CkMyPe()!=_diePE){
1494                 printf("restar begin on %d\n",CkMyPe());
1495                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1496                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1497                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1498         }else{
1499         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1500         CkRestartCheckPointCallback(NULL, NULL);
1501         }
1502 #else
1503   static int count = 0;
1504   CmiAssert(CkMyPe() == _diePE);
1505   count ++;
1506   if (count == CkNumPes()) {
1507           printf("restart begin on %d\n",CkMyPe());
1508     CkRestartCheckPointCallback(NULL, NULL);
1509     count = 0;
1510   }
1511 #endif
1512 #endif
1513 }
1514
1515 extern void _discard_charm_message();
1516 extern void _resume_charm_message();
1517
1518 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1519         return data;
1520 }
1521
1522 static void restartBcastHandler(char *msg)
1523 {
1524 #if CMK_MEM_CHECKPOINT
1525   // advance phase counter
1526   CkMemCheckPT::inRestarting = 1;
1527   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1528   // gzheng
1529   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1530
1531   if (CkMyPe()==_diePE)
1532     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1533
1534   // reset QD counters
1535 /*  gzheng
1536   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1537 */
1538
1539 /*  gzheng
1540   if (CkMyPe()==_diePE)
1541       CkRestartCheckPointCallback(NULL, NULL);
1542 */
1543   CmiFree(msg);
1544
1545   _resume_charm_message();
1546
1547     // reduction
1548   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1549   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1550 #if CMK_USE_BARRIER
1551         //CmiPrintf("before reduce\n"); 
1552         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1553         //CmiPrintf("after reduce\n");  
1554 #else
1555   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1556 #endif 
1557  checkpointed = 0;
1558 #endif
1559 }
1560
1561 extern void _initDone();
1562
1563 bool compare(char * buf1, char *buf2){
1564         PUP::checker pchecker(buf1,buf2);
1565         pchecker.skip();
1566         
1567         int numElements;
1568         pchecker|numElements;
1569         for(int i=0;i<numElements;i++){
1570         //for(int i=0;i<1;i++){
1571                 CkGroupID gID;
1572                 CkArrayIndex idx;
1573                 
1574                 pchecker|gID;
1575                 pchecker|idx;
1576                 
1577                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1578                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1579         }
1580         return pchecker.getResult();
1581         //return true;
1582 }
1583
1584 static void recvRemoteChkpHandler(char *msg){
1585    envelope *env = (envelope *)msg;
1586    CkUnpackMessage(&env);
1587    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1588   if(CpvAccess(recvdLocal)==1){
1589           int pointer = CpvAccess(curPointer);
1590           int size = CpvAccess(chkpBuf)[pointer]->len;
1591           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1592           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1593           }else
1594           {
1595                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1596           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1597           }
1598           delete chkpMsg;
1599           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1600   }else{
1601           CpvAccess(recvdRemote) = 1;
1602           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1603           CpvAccess(buddyBuf) = chkpMsg;
1604   }  
1605 }
1606
1607 static void replicaRecoverHandler(char *msg){
1608         CpvAccess(_remoteCrashedNode) = -1;
1609         CkMemCheckPT::replicaAlive = 1;
1610     bool ret = true;
1611     CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1612         CmiFree(msg);
1613         
1614 }
1615
1616 static void replicaDieHandler(char * msg){
1617 #if CMK_CONVERSE_MPI    
1618         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1619         CpvAccess(_remoteCrashedNode) = diePe;
1620         CkMemCheckPT::replicaAlive = 0;
1621         find_spare_mpirank(diePe,CmiMyPartition()^1);
1622     if(CkMyPe()==diePe){
1623       CkPrintf("pe %d in replicad word die\n",diePe);
1624             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1625     }
1626 #endif
1627         //broadcast to my partition to get local max iter
1628    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1629         CmiFree(msg);
1630 }
1631
1632
1633 static void replicaDieBcastHandler(char *msg){
1634         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1635         CpvAccess(_remoteCrashedNode) = diePe;
1636         CkMemCheckPT::replicaAlive = 0;
1637         CmiFree(msg);
1638 }
1639
1640 static void recoverRemoteProcDataHandler(char *msg){
1641    envelope *env = (envelope *)msg;
1642    CkUnpackMessage(&env);
1643    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1644         
1645    //store the checkpoint
1646         int pointer = procMsg->pointer;
1647
1648
1649    if(CkMyPe()==CpvAccess(_crashedNode)){
1650            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1651            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1652            PUP::fromMem p(procMsg->packData);
1653            _handleProcData(p,CmiTrue);
1654            _initDone();
1655    }
1656    else{
1657            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1658            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1659            //_handleProcData(p,CmiFalse);
1660    }
1661    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1662    CKLOCMGR_LOOP(mgr->startInserting(););
1663    
1664    CpvAccess(recvdProcChkp) =1;
1665         if(CpvAccess(recvdArrayChkp)==1){
1666                 _resume_charm_message();
1667                 _diePE = CpvAccess(_crashedNode);
1668                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1669                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1670                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1671         }
1672 }
1673
1674 static void recoverRemoteArrayDataHandler(char *msg){
1675    envelope *env = (envelope *)msg;
1676    CkUnpackMessage(&env);
1677    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1678         
1679    //store the checkpoint
1680         int pointer = chkpMsg->pointer;
1681         CpvAccess(curPointer) = pointer;
1682         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1683         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1684    CpvAccess(recvdArrayChkp) =1;
1685         CkMemCheckPT::inRestarting = 1;
1686         if(CpvAccess(recvdProcChkp) == 1){
1687                 _resume_charm_message();
1688                 _diePE = CpvAccess(_crashedNode);
1689                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1690                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1691                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1692                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1693         }
1694 }
1695
1696 static void recvPhaseHandler(char * msg)
1697 {
1698         CpvAccess(_curRestartPhase)--;
1699         CkMemCheckPT::inRestarting = 1;
1700         CmiFree(msg);
1701 }
1702 // called on crashed processor
1703 static void recoverProcDataHandler(char *msg)
1704 {
1705 #if CMK_MEM_CHECKPOINT
1706    int i;
1707    envelope *env = (envelope *)msg;
1708    CkUnpackMessage(&env);
1709    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1710    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1711    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1712    PUP::fromMem p(procMsg->packData);
1713    _handleProcData(p,CmiTrue);
1714
1715    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1716    // gzheng
1717    CKLOCMGR_LOOP(mgr->startInserting(););
1718
1719    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1720    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1721    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1722    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1723
1724    _initDone();
1725 //   CpvAccess(_qd)->flushStates();
1726    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1727 #endif
1728 }
1729 //for replica, got the phase number from my neighbor processor in the current partition
1730 static void askPhaseHandler(char *msg)
1731 {
1732 #if CMK_MEM_CHECKPOINT
1733         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1734     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1735         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1736     CmiSetHandler(msg, recvPhaseHandlerIdx);
1737         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1738 #endif
1739 }
1740 // called on its backup processor
1741 // get backup message buffer and sent to crashed processor
1742 static void askProcDataHandler(char *msg)
1743 {
1744 #if CMK_MEM_CHECKPOINT
1745     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1746     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1747     if (CpvAccess(procChkptBuf) == NULL)  {
1748       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1749       CkAbort("no checkpoint found");
1750     }
1751     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1752     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1753
1754     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1755
1756     CkPackMessage(&env);
1757     CmiSetHandler(env, recoverProcDataHandlerIdx);
1758     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1759     CpvAccess(procChkptBuf) = NULL;
1760     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1761 #endif
1762 }
1763
1764 // called on PE 0
1765 void qd_callback(void *m)
1766 {
1767    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1768    CkFreeMsg(m);
1769    if(CmiNumPartition()==1){
1770 #ifdef CMK_SMP
1771            for(int i=0;i<CmiMyNodeSize();i++){
1772            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1773            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1774                 CmiSetHandler(msg, askProcDataHandlerIdx);
1775                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1776                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1777            }
1778            return;
1779 #endif
1780            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1781            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1782            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1783            CmiSetHandler(msg, askProcDataHandlerIdx);
1784            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1785            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1786         }
1787    else{
1788                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1789                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1790                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1791    }
1792 }
1793
1794 // on crashed node
1795 void CkMemRestart(const char *dummy, CkArgMsg *args)
1796 {
1797 #if CMK_MEM_CHECKPOINT
1798    _diePE = CmiMyNode();
1799    CpvAccess( _crashedNode )= CmiMyNode();
1800    CkMemCheckPT::inRestarting = 1;
1801    _discard_charm_message();
1802   
1803   /*if(CmiMyRank()==0){
1804     CkCallback cb(qd_callback);
1805     CkStartQD(cb);
1806     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1807   }*/
1808    if(CmiNumPartition()==1){
1809            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1810                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1811                 restartT = CmiWallTimer();
1812            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1813            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1814            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1815            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1816            CmiSetHandler(msg, askProcDataHandlerIdx);
1817            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1818            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1819    }
1820    else{
1821                 CkCallback cb(qd_callback);
1822                 CkStartQD(cb);
1823    }
1824 #else
1825    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1826 #endif
1827 }
1828
1829 // can be called in other files
1830 // return true if it is in restarting
1831 extern "C"
1832 int CkInRestarting()
1833 {
1834 #if CMK_MEM_CHECKPOINT
1835   if (CpvAccess( _crashedNode)!=-1) return 1;
1836   // gzheng
1837   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1838   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1839   return CkMemCheckPT::inRestarting;
1840 #else
1841   return 0;
1842 #endif
1843 }
1844
1845 extern "C"
1846 int CkReplicaAlive()
1847 {
1848 //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1849 //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1850         return CkMemCheckPT::replicaAlive;
1851
1852         /*if(CkMemCheckPT::replicaDead==1)
1853                 return 0;
1854         else            
1855                 return 1;*/
1856 }
1857
1858 extern "C"
1859 int CkInCheckpointing()
1860 {
1861         return CkMemCheckPT::inCheckpointing;
1862 }
1863
1864 extern "C"
1865 void CkSetInLdb(){
1866 #if CMK_MEM_CHECKPOINT
1867         CkMemCheckPT::inLoadbalancing = 1;
1868 #endif
1869 }
1870
1871 extern "C"
1872 int CkInLdb(){
1873 #if CMK_MEM_CHECKPOINT
1874         return CkMemCheckPT::inLoadbalancing;
1875 #endif
1876         return 0;
1877 }
1878
1879 extern "C"
1880 void CkResetInLdb(){
1881 #if CMK_MEM_CHECKPOINT
1882         CkMemCheckPT::inLoadbalancing = 0;
1883 #endif
1884 }
1885
1886 /*****************************************************************************
1887                 module initialization
1888 *****************************************************************************/
1889
1890 static int arg_where = CkCheckPoint_inMEM;
1891
1892 #if CMK_MEM_CHECKPOINT
1893 void init_memcheckpt(char **argv)
1894 {
1895     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1896       arg_where = CkCheckPoint_inDISK;
1897     }
1898         // initiliazing _crashedNode variable
1899         CpvInitialize(int, _crashedNode);
1900         CpvInitialize(int, _remoteCrashedNode);
1901         CpvAccess(_crashedNode) = -1;
1902         CpvAccess(_remoteCrashedNode) = -1;
1903         init_FI(argv);
1904
1905 }
1906 #endif
1907
1908 class CkMemCheckPTInit: public Chare {
1909 public:
1910   CkMemCheckPTInit(CkArgMsg *m) {
1911 #if CMK_MEM_CHECKPOINT
1912     if (arg_where == CkCheckPoint_inDISK) {
1913       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1914     }
1915     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1916     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1917 #endif
1918   }
1919 };
1920
1921 static void notifyHandler(char *msg)
1922 {
1923 #if CMK_MEM_CHECKPOINT
1924   CmiFree(msg);
1925       /* immediately increase restart phase to filter old messages */
1926   CpvAccess(_curRestartPhase) ++;
1927   CpvAccess(_qd)->flushStates();
1928   _discard_charm_message();
1929
1930 #endif
1931 }
1932
1933 extern "C"
1934 void notify_crash(int node)
1935 {
1936 #ifdef CMK_MEM_CHECKPOINT
1937   CpvAccess( _crashedNode) = node;
1938 #ifdef CMK_SMP
1939   for(int i=0;i<CkMyNodeSize();i++){
1940         CpvAccessOther(_crashedNode,i)=node;
1941   }
1942 #endif
1943   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1944   CkMemCheckPT::inRestarting = 1;
1945
1946     // this may be in interrupt handler, send a message to reset QD
1947   int pe = CmiNodeFirst(CkMyNode());
1948   for(int i=0;i<CkMyNodeSize();i++){
1949         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1950         CmiSetHandler(msg, notifyHandlerIdx);
1951         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1952   }
1953 #endif
1954 }
1955
1956 extern "C" void (*notify_crash_fn)(int node);
1957
1958
1959 #if CMK_CONVERSE_MPI
1960 void buddyDieHandler(char *msg)
1961 {
1962 #if CMK_MEM_CHECKPOINT
1963    // notify
1964         CkMemCheckPT::inRestarting = 1;
1965    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1966    notify_crash(diepe);
1967    // send message to crash pe to let it restart
1968    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1969    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1970    int buddy = obj->BuddyPE(CmiMyPe());
1971    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1972    if (buddy == diepe)  {
1973      mpi_restart_crashed(diepe, newrank);
1974    }
1975 #endif
1976 }
1977
1978 void pingHandler(void *msg)
1979 {
1980   lastPingTime = CmiWallTimer();
1981   CmiFree(msg);
1982 }
1983
1984 void pingCheckHandler()
1985 {
1986 #if CMK_MEM_CHECKPOINT
1987   double now = CmiWallTimer();
1988   if (lastPingTime > 0 && now - lastPingTime > 3 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
1989   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
1990     int i, pe, buddy;
1991     // tell everyone the buddy dies
1992     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1993     for (i = 1; i < CmiNumPes(); i++) {
1994        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1995        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1996     }
1997     buddy = pe;
1998     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
1999     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2000     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2001     CmiSetHandler(msg, buddyDieHandlerIdx);
2002     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2003     //send to everyone in the other world
2004         if(CmiNumPartition()!=1){
2005                 for(int i=0;i<CmiNumPes();i++){
2006                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2007                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2008                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
2009                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2010                 }
2011         }
2012   }
2013   else 
2014     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2015 #endif
2016 }
2017
2018 void pingBuddy()
2019 {
2020 #if CMK_MEM_CHECKPOINT
2021   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2022   if (obj) {
2023     int buddy = obj->BuddyPE(CkMyPe());
2024     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2025     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2026     CmiSetHandler(msg, pingHandlerIdx);
2027     CmiGetRestartPhase(msg) = 9999;
2028     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2029   }
2030   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2031 #endif
2032 }
2033 #endif
2034
2035 // initproc
2036 void CkRegisterRestartHandler( )
2037 {
2038 #if CMK_MEM_CHECKPOINT
2039   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2040   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2041   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2042   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2043   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2044   
2045   //for replica
2046   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2047   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2048   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2049   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2050   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2051   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2052   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2053   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2054
2055 #if CMK_CONVERSE_MPI
2056   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2057   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2058   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2059 #endif
2060
2061   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2062   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2063   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2064   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2065   CpvInitialize(int,curPointer);
2066   CpvInitialize(int,recvdLocal);
2067   CpvInitialize(int,recvdRemote);
2068   CpvInitialize(int,recvdProcChkp);
2069   CpvInitialize(int,recvdArrayChkp);
2070
2071   CpvAccess(procChkptBuf) = NULL;
2072   CpvAccess(buddyBuf) = NULL;
2073   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2074   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2075   CpvAccess(chkpBuf)[0] = NULL;
2076   CpvAccess(chkpBuf)[1] = NULL;
2077   CpvAccess(localProcChkpBuf)[0] = NULL;
2078   CpvAccess(localProcChkpBuf)[1] = NULL;
2079   
2080   CpvAccess(curPointer) = 0;
2081   CpvAccess(recvdLocal) = 0;
2082   CpvAccess(recvdRemote) = 0;
2083   CpvAccess(recvdProcChkp) = 0;
2084   CpvAccess(recvdArrayChkp) = 0;
2085
2086   notify_crash_fn = notify_crash;
2087
2088 #if ! CMK_CONVERSE_MPI
2089   // print pid to kill
2090 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2091 //  sleep(4);
2092 #endif
2093 #endif
2094 }
2095
2096
2097 extern "C"
2098 int CkHasCheckpoints()
2099 {
2100   return checkpointed;
2101 }
2102
2103 /// @todo: the following definitions should be moved to a separate file containing
2104 // structures and functions about fault tolerance strategies
2105
2106 /**
2107  *  * @brief: function for killing a process                                             
2108  *   */
2109 #ifdef CMK_MEM_CHECKPOINT
2110 #if CMK_HAS_GETPID
2111 void killLocal(void *_dummy,double curWallTime){
2112         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2113         if(CmiWallTimer()<killTime-1){
2114                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2115         }else{ 
2116 #if CMK_CONVERSE_MPI
2117                                 CkDieNow();
2118 #else 
2119                 kill(getpid(),SIGKILL);                                               
2120 #endif
2121         }              
2122
2123 #else
2124 void killLocal(void *_dummy,double curWallTime){
2125   CmiAbort("kill() not supported!");
2126 }
2127 #endif
2128 #endif
2129
2130 #ifdef CMK_MEM_CHECKPOINT
2131 /**
2132  * @brief: reads the file with the kill information
2133  */
2134 void readKillFile(){
2135         FILE *fp=fopen(killFile,"r");
2136         if(!fp){
2137                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2138                 return;
2139         }
2140         int proc;
2141         double sec;
2142         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2143                 if(proc == CkMyNode() && CkMyRank() == 0){
2144                         killTime = CmiWallTimer()+sec;
2145                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2146                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2147                 }
2148         }
2149         fclose(fp);
2150 }
2151
2152 #if ! CMK_CONVERSE_MPI
2153 void CkDieNow()
2154 {
2155 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2156          // ignored for non-mpi version
2157         CmiPrintf("[%d] die now.\n", CmiMyPe());
2158         killTime = CmiWallTimer()+0.001;
2159         CcdCallFnAfter(killLocal,NULL,1);
2160 #endif
2161 }
2162 #endif
2163
2164 #endif
2165
2166 #include "CkMemCheckpoint.def.h"
2167
2168