pass hard failure recover test on BGQ
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74 CpvDeclare(int, _remoteCrashedNode);
75
76 // static, so that it is accessible from Converse part
77 int CkMemCheckPT::inRestarting = 0;
78 int CkMemCheckPT::replicaAlive = 1;
79 int CkMemCheckPT::inLoadbalancing = 0;
80 double CkMemCheckPT::startTime;
81 char *CkMemCheckPT::stage;
82 CkCallback CkMemCheckPT::cpCallback;
83
84 int _memChkptOn = 1;                    // checkpoint is on or off
85
86 CkGroupID ckCheckPTGroupID;             // readonly
87
88 static int checkpointed = 0;
89
90 /// @todo the following declarations should be moved into a separate file for all 
91 // fault tolerant strategies
92
93 #ifdef CMK_MEM_CHECKPOINT
94 // name of the kill file that contains processes to be killed 
95 char *killFile;                                               
96 // flag for the kill file         
97 int killFlag=0;
98 // variable for storing the killing time
99 double killTime=0.0;
100 #endif
101
102 #ifdef CKLOCMGR_LOOP
103 #undef CKLOCMGR_LOOP
104 #endif
105 // loop over all CkLocMgr and do "code"
106 #define  CKLOCMGR_LOOP(code)    {       \
107   int numGroups = CkpvAccess(_groupIDTable)->size();    \
108   for(int i=0;i<numGroups;i++) {        \
109     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
110     if(obj->isLocMgr())  {      \
111       CkLocMgr *mgr = (CkLocMgr*)obj;   \
112       code      \
113     }   \
114   }     \
115  }
116
117 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
118 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
119 CpvDeclare(CkCheckPTMessage**, chkpBuf);
120 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
121 //store the checkpoint of the buddy to compare
122 //do not need the whole msg, can be the checksum
123 CpvDeclare(CkCheckPTMessage*, buddyBuf);
124 //pointer of the checkpoint going to be written
125 CpvDeclare(int, curPointer);
126 CpvDeclare(int, recvdRemote);
127 CpvDeclare(int, recvdLocal);
128 CpvDeclare(int, recvdArrayChkp);
129 CpvDeclare(int, recvdProcChkp);
130
131 bool compare(char * buf1, char * buf2);
132 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
133 // Converse function handles
134 static int askPhaseHandlerIdx;
135 static int recvPhaseHandlerIdx;
136 static int askProcDataHandlerIdx;
137 static int restartBcastHandlerIdx;
138 static int recoverProcDataHandlerIdx;
139 static int restartBeginHandlerIdx;
140 static int recvRemoteChkpHandlerIdx;
141 static int replicaDieHandlerIdx;
142 static int replicaDieBcastHandlerIdx;
143 static int replicaRecoverHandlerIdx;
144 static int recoverRemoteProcDataHandlerIdx;
145 static int recoverRemoteArrayDataHandlerIdx;
146 static int notifyHandlerIdx;
147 // compute the backup processor
148 // FIXME: avoid crashed processors
149 #if CMK_CONVERSE_MPI
150 static int pingHandlerIdx;
151 static int pingCheckHandlerIdx;
152 static int buddyDieHandlerIdx;
153 static double lastPingTime = -1;
154
155 extern "C" void mpi_restart_crashed(int pe, int rank);
156 extern "C" int  find_spare_mpirank(int pe,int partition);
157
158 void pingBuddy();
159 void pingCheckHandler();
160
161 #endif
162 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
163
164 inline int CkMemCheckPT::BuddyPE(int pe)
165 {
166   int budpe;
167 #if NODE_CHECKPOINT
168     // buddy is the processor with same rank on the next physical node
169   int r1 = CmiPhysicalRank(pe);
170   int budnode = CmiPhysicalNodeID(pe);
171   do {
172     budnode = (budnode+1)%CmiNumPhysicalNodes();
173     int *pelist;
174     int num;
175     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
176     budpe = pelist[r1 % num];
177   } while (isFailed(budpe));
178   if (budpe == pe) {
179     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
180     CmiAbort("Failed to find a buddy processor");
181   }
182 #else
183   budpe = pe;
184   while (budpe == pe || isFailed(budpe)) 
185           budpe = (budpe+1)%CkNumPes();
186 #endif
187   return budpe;
188 }
189
190 // called in array element constructor
191 // choose and register with 2 buddies for checkpoiting 
192 #if CMK_MEM_CHECKPOINT
193 void ArrayElement::init_checkpt() {
194         if (_memChkptOn == 0) return;
195         if (CkInRestarting()) {
196           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
197         }
198         // only master init checkpoint
199         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
200
201         budPEs[0] = CkMyPe();
202         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
203         CmiAssert(budPEs[0] != budPEs[1]);
204         // inform checkPTMgr
205         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
206         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
207         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
208         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
209 }
210 #endif
211
212 // entry function invoked by checkpoint mgr asking for checkpoint data
213 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
214 #if CMK_MEM_CHECKPOINT
215 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
216 //char index[128];   thisIndexMax.sprint(index);
217 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
218   CkLocMgr *locMgr = thisArray->getLocMgr();
219   CmiAssert(myRec!=NULL);
220   int size;
221   {
222         PUP::sizer p;
223         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
224         size = p.size();
225   }
226   int packSize = size/sizeof(double) +1;
227   CkArrayCheckPTMessage *msg =
228                  new (packSize, 0) CkArrayCheckPTMessage;
229   msg->len = size;
230   msg->index =thisIndexMax;
231   msg->aid = thisArrayID;
232   msg->locMgr = locMgr->getGroupID();
233   msg->cp_flag = 1;
234   {
235         PUP::toMem p(msg->packData);
236         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
237   }
238
239   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
240   checkptMgr.recvData(msg, 2, budPEs);
241   delete m;
242 #endif
243 }
244
245 // checkpoint holder class - for memory checkpointing
246 class CkMemCheckPTInfo: public CkCheckPTInfo
247 {
248   CkArrayCheckPTMessage *ckBuffer;
249 public:
250   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
251                     CkCheckPTInfo(a, loc, idx, pno)
252   {
253     ckBuffer = NULL;
254   }
255   ~CkMemCheckPTInfo() 
256   {
257     if (ckBuffer) delete ckBuffer; 
258   }
259   inline void updateBuffer(CkArrayCheckPTMessage *data) 
260   {
261     CmiAssert(data!=NULL);
262     if (ckBuffer) delete ckBuffer;
263     ckBuffer = data;
264   }    
265   inline CkArrayCheckPTMessage * getCopy()
266   {
267     if (ckBuffer == NULL) {
268       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
269       CmiAbort("Abort!");
270     }
271     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
272   }     
273   inline void updateBuddy(int b1, int b2) {
274      CmiAssert(ckBuffer);
275      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
276      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
277      CmiAssert(pNo != CkMyPe());
278   }
279   inline int getSize() { 
280      CmiAssert(ckBuffer);
281      return ckBuffer->len; 
282   }
283 };
284
285 // checkpoint holder class - for in-disk checkpointing
286 class CkDiskCheckPTInfo: public CkCheckPTInfo 
287 {
288   char *fname;
289   int bud1, bud2;
290   int len;                      // checkpoint size
291 public:
292   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
293   {
294 #if CMK_USE_MKSTEMP
295     fname = new char[64];
296     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
297     mkstemp(fname);
298 #else
299     fname=tmpnam(NULL);
300 #endif
301     bud1 = bud2 = -1;
302     len = 0;
303   }
304   ~CkDiskCheckPTInfo() 
305   {
306     remove(fname);
307   }
308   inline void updateBuffer(CkArrayCheckPTMessage *data) 
309   {
310     double t = CmiWallTimer();
311     // unpack it
312     envelope *env = UsrToEnv(data);
313     CkUnpackMessage(&env);
314     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
315     FILE *f = fopen(fname,"wb");
316     PUP::toDisk p(f);
317     CkPupMessage(p, (void **)&data);
318     // delay sync to the end because otherwise the messages are blocked
319 //    fsync(fileno(f));
320     fclose(f);
321     bud1 = data->bud1;
322     bud2 = data->bud2;
323     len = data->len;
324     delete data;
325     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
326   }
327   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
328   {
329     CkArrayCheckPTMessage *data;
330     FILE *f = fopen(fname,"rb");
331     PUP::fromDisk p(f);
332     CkPupMessage(p, (void **)&data);
333     fclose(f);
334     data->bud1 = bud1;                          // update the buddies
335     data->bud2 = bud2;
336     return data;
337   }
338   inline void updateBuddy(int b1, int b2) {
339      bud1 = b1; bud2 = b2;
340      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
341      CmiAssert(pNo != CkMyPe());
342   }
343   inline int getSize() { 
344      return len; 
345   }
346 };
347
348 CkMemCheckPT::CkMemCheckPT(int w)
349 {
350   int numnodes = 0;
351 #if NODE_CHECKPOINT
352   numnodes = CmiNumPhysicalNodes();
353 #else
354   numnodes = CkNumPes();
355 #endif
356 #if CK_NO_PROC_POOL
357   if (numnodes <= 2)
358 #else
359   if (numnodes  == 1)
360 #endif
361   {
362     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
363     _memChkptOn = 0;
364   }
365   inRestarting = 0;
366   recvCount = peCount = 0;
367   ackCount = 0;
368   expectCount = -1;
369   where = w;
370   replicaAlive = 1;
371 #if CMK_CONVERSE_MPI
372   void pingBuddy();
373   void pingCheckHandler();
374   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
375   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
376 #endif
377   chkpTable[0] = NULL;
378   chkpTable[1] = NULL;
379 }
380
381 CkMemCheckPT::~CkMemCheckPT()
382 {
383   int len = ckTable.length();
384   for (int i=0; i<len; i++) {
385     delete ckTable[i];
386   }
387 }
388
389 void CkMemCheckPT::pup(PUP::er& p) 
390
391   CBase_CkMemCheckPT::pup(p); 
392   p|cpStarter;
393   p|thisFailedPe;
394   p|failedPes;
395   p|ckCheckPTGroupID;           // recover global variable
396   p|cpCallback;                 // store callback
397   p|where;                      // where to checkpoint
398   p|peCount;
399   if (p.isUnpacking()) {
400     recvCount = 0;
401 #if CMK_CONVERSE_MPI
402     void pingBuddy();
403     void pingCheckHandler();
404     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
405     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
406 #endif
407   }
408 }
409
410 // called by checkpoint mgr to restore an array element
411 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
412 {
413 #if CMK_MEM_CHECKPOINT
414   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
415   // m->index.print();
416   PUP::fromMem p(m->packData);
417   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
418   CmiAssert(mgr);
419 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
420   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
421 #else
422   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
423 #endif
424
425   // find a list of array elements bound together
426   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
427   CmiAssert(elt);
428   CkLocRec_local *rec = elt->myRec;
429   CkVec<CkMigratable *> list;
430   mgr->migratableList(rec, list);
431   CmiAssert(list.length() > 0);
432   for (int l=0; l<list.length(); l++) {
433     elt = (ArrayElement *)list[l];
434     elt->budPEs[0] = m->bud1;
435     elt->budPEs[1] = m->bud2;
436     //    reset, may not needed now
437     // for now.
438     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
439       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
440       if (c) c->redNo = 0;
441     }
442   }
443 #endif
444 }
445
446 // return 1 if pe is a crashed processor
447 int CkMemCheckPT::isFailed(int pe)
448 {
449   for (int i=0; i<failedPes.length(); i++)
450     if (failedPes[i] == pe) return 1;
451   return 0;
452 }
453
454 // add pe into history list of all failed processors
455 void CkMemCheckPT::failed(int pe)
456 {
457   if (isFailed(pe)) return;
458   failedPes.push_back(pe);
459 }
460
461 int CkMemCheckPT::totalFailed()
462 {
463   return failedPes.length();
464 }
465
466 // create an checkpoint entry for array element of aid with index.
467 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
468 {
469   // error check, no duplicate
470   int idx, len = ckTable.size();
471   for (idx=0; idx<len; idx++) {
472     CkCheckPTInfo *entry = ckTable[idx];
473     if (index == entry->index) {
474       if (loc == entry->locMgr) {
475           // bindTo array elements
476           return;
477       }
478         // for array inheritance, the following check may fail
479         // because ArrayElement constructor of all superclasses are called
480       if (aid == entry->aid) {
481         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
482         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
483       }
484     }
485   }
486   CkCheckPTInfo *newEntry;
487   if (where == CkCheckPoint_inMEM)
488     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
489   else
490     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
491   ckTable.push_back(newEntry);
492   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
493 }
494
495 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
496 {
497 #if !CMK_CHKP_ALL       
498   int buddy = msg->bud1;
499   if (buddy == CkMyPe()) buddy = msg->bud2;
500   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
501   recvData(msg);
502     // ack
503   thisProxy[buddy].gotData();
504 #else
505   chkpTable[0] = NULL;
506   chkpTable[1] = NULL;
507   recvArrayCheckpoint(msg);
508   thisProxy[msg->bud2].gotData();
509 #endif
510 }
511
512 // loop through my checkpoint table and ask checkpointed array elements
513 // to send me checkpoint data.
514 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
515 {
516   checkpointed = 1;
517   cpCallback = cb;
518   cpStarter = starter;
519   if (CkMyPe() == cpStarter) {
520     startTime = CmiWallTimer();
521     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
522   }
523 #if CMK_CONVERSE_MPI
524   if(CmiNumPartition()==1)
525 #endif    
526   {
527
528 #if !CMK_CHKP_ALL
529   int len = ckTable.length();
530   for (int i=0; i<len; i++) {
531     CkCheckPTInfo *entry = ckTable[i];
532       // always let the bigger number processor send request
533     //if (CkMyPe() < entry->pNo) continue;
534       // always let the smaller number processor send request, may on same proc
535     if (!isMaster(entry->pNo)) continue;
536       // call inmem_checkpoint to the array element, ask it to send
537       // back checkpoint data via recvData().
538     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
539     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
540   }
541     // if my table is empty, then I am done
542   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
543 #else
544   startArrayCheckpoint();
545 #endif
546         sendProcData();
547   }
548 #if CMK_CONVERSE_MPI
549   else
550   {
551         startCheckpoint();
552   }
553 #endif
554   // pack and send proc level data
555 }
556
557 class MemElementPacker : public CkLocIterator{
558         private:
559                 CkLocMgr *locMgr;
560                 PUP::er &p;
561         public:
562                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
563                 void addLocation(CkLocation &loc){
564                         CkArrayIndexMax idx = loc.getIndex();
565                         CkGroupID gID = locMgr->ckGetGroupID();
566                         p|gID;
567                         p|idx;
568                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
569                 }
570 };
571
572 void CkMemCheckPT::pupAllElements(PUP::er &p){
573 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
574         int numElements;
575         if(!p.isUnpacking()){
576                 numElements = CkCountArrayElements();
577         }
578         p | numElements;
579         if(!p.isUnpacking()){
580                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
581         }
582 #endif
583 }
584
585 void CkMemCheckPT::startArrayCheckpoint(){
586 #if CMK_CHKP_ALL
587         int size;
588         {
589                 PUP::sizer psizer;
590                 pupAllElements(psizer);
591                 size = psizer.size();
592         }
593         int packSize = size/sizeof(double)+1;
594  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
595         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
596         msg->len = size;
597         msg->cp_flag = 1;
598         int budPEs[2];
599         msg->bud1=CkMyPe();
600         msg->bud2=ChkptOnPe(CkMyPe());
601         {
602                 PUP::toMem p(msg->packData);
603                 pupAllElements(p);
604         }
605         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
606         if(chkpTable[0]) delete chkpTable[0];
607         chkpTable[0] = msg;
608         //send the checkpoint to my 
609         recvCount++;
610 #endif
611 }
612
613 void CkMemCheckPT::startCheckpoint(){
614 #if CMK_CONVERSE_MPI
615         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
616     CkPrintf("in start checkpointing!!!!\n");
617   int size;
618   {
619     PUP::sizer p;
620     _handleProcData(p,CmiFalse);
621     size = p.size();
622   }
623         int packSize = size/sizeof(double)+1;
624         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
625         procMsg->len = size;
626         procMsg->cp_flag = 1;
627         {
628                 PUP::toMem p(procMsg->packData);
629                 _handleProcData(p,CmiFalse);
630         }
631         int pointer = CpvAccess(curPointer);
632         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
633                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
634         
635         {
636                 PUP::sizer psizer;
637                 pupAllElements(psizer);
638                 size = psizer.size();
639         }
640         packSize = size/sizeof(double)+1;
641         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
642         msg->len = size;
643         msg->cp_flag = 1;
644         {
645                 PUP::toMem p(msg->packData);
646                 pupAllElements(p);
647         }
648         pointer = CpvAccess(curPointer);
649         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
650     CkPrintf("start checkpointing!!!!\n");
651         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
652                 CpvAccess(chkpBuf)[pointer] = msg;
653         if(CkReplicaAlive()==1){
654                 CpvAccess(recvdLocal) = 1;
655                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
656                 CkPackMessage(&env);
657                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
658                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
659         }
660         if(CpvAccess(recvdRemote)==1){
661                 //compare the checkpoint 
662           int size = CpvAccess(chkpBuf)[pointer]->len;
663           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
664                 thisProxy[CkMyPe()].doneComparison(true);
665           }else{
666                 thisProxy[CkMyPe()].doneComparison(true);
667           }
668   }
669         else{
670                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
671                         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
672                                 int pointer = CpvAccess(curPointer);
673                                 //send the proc data
674                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
675                                 procMsg->pointer = pointer;
676                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
677                                 CkPackMessage(&env);
678                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
679                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
680                                 CkPrintf("[%d] sendProcdata\n",CkMyPe());
681                         }
682                         //send the array checkpoint data
683                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
684                         msg->pointer = CpvAccess(curPointer);
685                         envelope * env = (envelope *)(UsrToEnv(msg));
686                         CkPackMessage(&env);
687                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
688                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
689                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
690                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
691                         //can continue work, no need to wait for my replica
692                 }
693         }
694 #endif
695 }
696
697 void CkMemCheckPT::doneComparison(bool ret){
698         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
699         contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
700 }
701
702 void CkMemCheckPT::doneRComparison(bool ret){
703         CpvAccess(recvdRemote) = 0;
704         CpvAccess(recvdLocal) = 0;
705 //      if(CpvAccess(curPointer) == 0){
706         if(ret==true){
707         CpvAccess(curPointer)^=1;
708                 if(CkMyPe() == 0){
709       CkPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
710                         cpCallback.send();
711                 }
712         }else{
713                 RollBack();
714         }
715 }
716
717 void CkMemCheckPT::RollBack(){
718         //restore group data
719         CkMemCheckPT::inRestarting = 1;
720         int pointer = CpvAccess(curPointer)^1;//use the previous one
721     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
722         PUP::fromMem p(chkpMsg->packData);      
723         //_handleProcData(p);
724         
725         //destroy array elements
726         //CKLOCMGR_LOOP(mgr->flushLocalRecs(););
727           int numGroups = CkpvAccess(_groupIDTable)->size();
728           for(int i=0;i<numGroups;i++) {
729                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
730                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
731                 obj->flushStates();
732                 obj->ckJustMigrated();
733           }
734         //restore array elements
735         
736         int numElements;
737         p|numElements;
738         
739         if(p.isUnpacking()){
740                 for(int i=0;i<numElements;i++){
741                 //for(int i=0;i<1;i++){
742                         CkGroupID gID;
743                         CkArrayIndex idx;
744                         p|gID;
745                         p|idx;
746                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
747                         CmiAssert(mgr);
748                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
749                 }
750         }
751         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
752         contribute(cb);
753 }
754
755 void CkMemCheckPT::notifyReplicaDie(int pe){
756         CkPrintf("[%d] receive replica die\n",CkMyPe());
757         replicaAlive = 0;
758         CpvAccess(_remoteCrashedNode) = pe;
759 }
760
761 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
762 {
763 #if CMK_CHKP_ALL
764         int idx = 1;
765         if(msg->bud1 == CkMyPe()){
766                 idx = 0;
767         }
768         int isChkpting = msg->cp_flag;
769         if(isChkpting == 1){
770                 if(chkpTable[idx]) delete chkpTable[idx];
771         }
772         chkpTable[idx] = msg;
773         if(isChkpting){
774                 recvCount++;
775                 if(recvCount == 2){
776                   if (where == CkCheckPoint_inMEM) {
777                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
778                   }
779                   else if (where == CkCheckPoint_inDISK) {
780                         // another barrier for finalize the writing using fsync
781                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
782                         contribute(0,NULL,CkReduction::sum_int,localcb);
783                   }
784                   else
785                         CmiAbort("Unknown checkpoint scheme");
786                   recvCount = 0;
787                 }
788         }
789 #endif
790 }
791
792 // don't handle array elements
793 static inline void _handleProcData(PUP::er &p, CmiBool create)
794 {
795     // save readonlys, and callback BTW
796     CkPupROData(p);
797
798     // save mainchares 
799     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
800         
801 #ifndef CMK_CHARE_USE_PTR
802     // save non-migratable chare
803     CkPupChareData(p);
804 #endif
805
806     // save groups into Groups.dat
807     CkPupGroupData(p,create);
808
809     // save nodegroups into NodeGroups.dat
810     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
811 }
812
813 void CkMemCheckPT::sendProcData()
814 {
815   // find out size of buffer
816   int size;
817   {
818     PUP::sizer p;
819     _handleProcData(p,CmiTrue);
820     size = p.size();
821   }
822   int packSize = size;
823   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
824   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
825   {
826     PUP::toMem p(msg->packData);
827     _handleProcData(p,CmiTrue);
828   }
829   msg->pe = CkMyPe();
830   msg->len = size;
831   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
832   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
833 }
834
835 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
836 {
837   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
838   CpvAccess(procChkptBuf) = msg;
839   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
840   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
841 }
842
843 // ArrayElement call this function to give us the checkpointed data
844 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
845 {
846   int len = ckTable.length();
847   int idx;
848   for (idx=0; idx<len; idx++) {
849     CkCheckPTInfo *entry = ckTable[idx];
850     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
851   }
852   CkAssert(idx < len);
853   int isChkpting = msg->cp_flag;
854   ckTable[idx]->updateBuffer(msg);
855   if (isChkpting) {
856       // all my array elements have returned their inmem data
857       // inform starter processor that I am done.
858     recvCount ++;
859     if (recvCount == ckTable.length()) {
860       if (where == CkCheckPoint_inMEM) {
861         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
862       }
863       else if (where == CkCheckPoint_inDISK) {
864         // another barrier for finalize the writing using fsync
865         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
866         contribute(0,NULL,CkReduction::sum_int,localcb);
867       }
868       else
869         CmiAbort("Unknown checkpoint scheme");
870       recvCount = 0;
871     } 
872   }
873 }
874
875 // only used in disk checkpointing
876 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
877 {
878   delete m;
879 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
880   system("sync");
881 #endif
882   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
883 }
884
885 // only is called on cpStarter when checkpoint is done
886 void CkMemCheckPT::cpFinish()
887 {
888   CmiAssert(CkMyPe() == cpStarter);
889   peCount++;
890     // now that all processors have finished, activate callback
891   if (peCount == 2) 
892 {
893     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
894     cpCallback.send();
895     peCount = 0;
896     thisProxy.report();
897   }
898 }
899
900 // for debugging, report checkpoint info
901 void CkMemCheckPT::report()
902 {
903 #if !CMK_CHKP_ALL
904   int objsize = 0;
905   int len = ckTable.length();
906   for (int i=0; i<len; i++) {
907     CkCheckPTInfo *entry = ckTable[i];
908     CmiAssert(entry);
909     objsize += entry->getSize();
910   }
911   CmiAssert(CpvAccess(procChkptBuf));
912   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
913 #else
914   if(CkMyPe()==0)
915   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
916 #endif
917 }
918
919 /*****************************************************************************
920                         RESTART Procedure
921 *****************************************************************************/
922
923 // master processor of two buddies
924 inline int CkMemCheckPT::isMaster(int buddype)
925 {
926 #if 0
927   int mype = CkMyPe();
928 //CkPrintf("ismaster: %d %d\n", pe, mype);
929   if (CkNumPes() - totalFailed() == 2) {
930     return mype > buddype;
931   }
932   for (int i=1; i<CkNumPes(); i++) {
933     int me = (buddype+i)%CkNumPes();
934     if (isFailed(me)) continue;
935     if (me == mype) return 1;
936     else return 0;
937   }
938   return 0;
939 #else
940     // smaller one
941   int mype = CkMyPe();
942 //CkPrintf("ismaster: %d %d\n", pe, mype);
943   if (CkNumPes() - totalFailed() == 2) {
944     return mype < buddype;
945   }
946 #if NODE_CHECKPOINT
947   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
948   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
949 #else
950   for (int i=1; i<CkNumPes(); i++) {
951 #endif
952     int me = (mype+i)%CkNumPes();
953     if (isFailed(me)) continue;
954     if (me == buddype) return 1;
955     else return 0;
956   }
957   return 0;
958 #endif
959 }
960
961
962
963 #if 0
964 // helper class to pup all elements that belong to same ckLocMgr
965 class ElementDestoryer : public CkLocIterator {
966 private:
967         CkLocMgr *locMgr;
968 public:
969         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
970         void addLocation(CkLocation &loc) {
971                 CkArrayIndex idx=loc.getIndex();
972                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
973                 loc.destroy();
974         }
975 };
976 #endif
977
978 // restore the bitmap vector for LB
979 void CkMemCheckPT::resetLB(int diepe)
980 {
981 #if CMK_LBDB_ON
982   int i;
983   char *bitmap = new char[CkNumPes()];
984   // set processor available bitmap
985   get_avail_vector(bitmap);
986
987   for (i=0; i<failedPes.length(); i++)
988     bitmap[failedPes[i]] = 0; 
989   bitmap[diepe] = 0;
990
991 #if CK_NO_PROC_POOL
992   set_avail_vector(bitmap);
993 #endif
994
995   // if I am the crashed pe, rebuild my failedPEs array
996   if (CkMyNode() == diepe)
997     for (i=0; i<CkNumPes(); i++) 
998       if (bitmap[i]==0) failed(i);
999
1000   delete [] bitmap;
1001 #endif
1002 }
1003
1004 static double restartT;
1005
1006 // in case when failedPe dies, everybody go through its checkpoint table:
1007 // destory all array elements
1008 // recover lost buddies
1009 // reconstruct all array elements from check point data
1010 // called on all processors
1011 void CkMemCheckPT::restart(int diePe)
1012 {
1013 #if CMK_MEM_CHECKPOINT
1014   double curTime = CmiWallTimer();
1015   if (CkMyPe() == diePe){
1016            restartT = CmiWallTimer();
1017     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1018   }
1019   stage = (char*)"resetLB";
1020   startTime = curTime;
1021   if (CkMyPe() == diePe)
1022     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1023
1024 #if CK_NO_PROC_POOL
1025   failed(diePe);        // add into the list of failed pes
1026 #endif
1027   thisFailedPe = diePe;
1028
1029   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1030
1031   inRestarting = 1;
1032                                                                                 
1033   // disable load balancer's barrier
1034   if (CkMyPe() != diePe) resetLB(diePe);
1035
1036   CKLOCMGR_LOOP(mgr->startInserting(););
1037
1038
1039   if(CmiNumPartition()==1){
1040         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1041   }else{
1042         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1043   }
1044 #endif
1045 }
1046
1047 // loally remove all array elements
1048 void CkMemCheckPT::removeArrayElements()
1049 {
1050 #if CMK_MEM_CHECKPOINT
1051   int len = ckTable.length();
1052   double curTime = CmiWallTimer();
1053   if (CkMyPe() == thisFailedPe) 
1054     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1055   stage = (char*)"removeArrayElements";
1056   startTime = curTime;
1057
1058   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
1059   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1060
1061   // get rid of all buffering and remote recs
1062   // including destorying all array elements
1063 #if CK_NO_PROC_POOL  
1064         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1065 #else
1066         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1067 #endif
1068   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1069 #endif
1070 }
1071
1072 // flush state in reduction manager
1073 void CkMemCheckPT::resetReductionMgr()
1074 {
1075   if (CkMyPe() == thisFailedPe) 
1076     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1077   int numGroups = CkpvAccess(_groupIDTable)->size();
1078   for(int i=0;i<numGroups;i++) {
1079     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1080     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1081     obj->flushStates();
1082     obj->ckJustMigrated();
1083   }
1084   // reset again
1085   //CpvAccess(_qd)->flushStates();
1086   if(CmiNumPartition()==1){
1087         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1088   }
1089   else
1090         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1091 }
1092
1093 // recover the lost buddies
1094 void CkMemCheckPT::recoverBuddies()
1095 {
1096   int idx;
1097   int len = ckTable.length();
1098   // ready to flush reduction manager
1099   // cannot be CkMemCheckPT::restart because destory will modify states
1100   double curTime = CmiWallTimer();
1101   if (CkMyPe() == thisFailedPe)
1102   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1103   stage = (char *)"recoverBuddies";
1104   if (CkMyPe() == thisFailedPe)
1105   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1106   startTime = curTime;
1107
1108   // recover buddies
1109   expectCount = 0;
1110 #if !CMK_CHKP_ALL
1111   for (idx=0; idx<len; idx++) {
1112     CkCheckPTInfo *entry = ckTable[idx];
1113     if (entry->pNo == thisFailedPe) {
1114 #if CK_NO_PROC_POOL
1115       // find a new buddy
1116       int budPe = BuddyPE(CkMyPe());
1117 #else
1118       int budPe = thisFailedPe;
1119 #endif
1120       CkArrayCheckPTMessage *msg = entry->getCopy();
1121       msg->bud1 = budPe;
1122       msg->bud2 = CkMyPe();
1123       msg->cp_flag = 0;            // not checkpointing
1124       thisProxy[budPe].recoverEntry(msg);
1125       expectCount ++;
1126     }
1127   }
1128 #else
1129   //send to failed pe
1130   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1131 #if CK_NO_PROC_POOL
1132       // find a new buddy
1133       int budPe = BuddyPE(CkMyPe());
1134 #else
1135       int budPe = thisFailedPe;
1136 #endif
1137       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1138       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1139           msg->cp_flag = 0;            // not checkpointing
1140       msg->bud1 = budPe;
1141       msg->bud2 = CkMyPe();
1142       thisProxy[budPe].recoverEntry(msg);
1143       expectCount ++;
1144   }
1145 #endif
1146
1147   if (expectCount == 0) {
1148     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1149   }
1150   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1151 }
1152
1153 void CkMemCheckPT::gotData()
1154 {
1155   ackCount ++;
1156   if (ackCount == expectCount) {
1157     ackCount = 0;
1158     expectCount = -1;
1159     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1160     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1161   }
1162 }
1163
1164 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1165 {
1166
1167   for (int i=0; i<n; i++) {
1168     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1169     mgr->updateLocation(idx[i], nowOnPe);
1170   }
1171         thisProxy[nowOnPe].gotReply();
1172 }
1173
1174 // restore array elements
1175 void CkMemCheckPT::recoverArrayElements()
1176 {
1177   double curTime = CmiWallTimer();
1178   int len = ckTable.length();
1179   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1180   stage = (char *)"recoverArrayElements";
1181   if (CkMyPe() == thisFailedPe)
1182   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1183   startTime = curTime;
1184  int flag = 0;
1185   // recover all array elements
1186   int count = 0;
1187
1188 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1189   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1190   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1191 #endif
1192
1193 #if !CMK_CHKP_ALL
1194   for (int idx=0; idx<len; idx++)
1195   {
1196     CkCheckPTInfo *entry = ckTable[idx];
1197 #if CK_NO_PROC_POOL
1198     // the bigger one will do 
1199 //    if (CkMyPe() < entry->pNo) continue;
1200     if (!isMaster(entry->pNo)) continue;
1201 #else
1202     // smaller one do it, which has the original object
1203     if (CkMyPe() == entry->pNo+1 || 
1204         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1205 #endif
1206 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1207
1208     entry->updateBuddy(CkMyPe(), entry->pNo);
1209     CkArrayCheckPTMessage *msg = entry->getCopy();
1210     // gzheng
1211     //thisProxy[CkMyPe()].inmem_restore(msg);
1212     inmem_restore(msg);
1213 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1214     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1215     int homePe = mgr->homePe(msg->index);
1216     if (homePe != CkMyPe()) {
1217       gmap[homePe].push_back(msg->locMgr);
1218       imap[homePe].push_back(msg->index);
1219     }
1220 #endif
1221     CkFreeMsg(msg);
1222     count ++;
1223   }
1224 #else
1225         double * packData;
1226         if(CmiNumPartition()==1){
1227                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1228                 packData = msg->packData;
1229         }
1230         else{
1231                 int pointer = CpvAccess(curPointer);
1232                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1233                 packData = msg->packData;
1234         }
1235 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1236         recoverAll(packData,gmap,imap);
1237 #else
1238         recoverAll(packData);
1239 #endif
1240 #endif
1241   curTime = CmiWallTimer();
1242   //if (CkMyPe() == thisFailedPe)
1243   if (CkMyPe() == 0)
1244         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1245 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1246   for (int i=0; i<CkNumPes(); i++) {
1247     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1248       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1249         flag++; 
1250         }
1251   }
1252   delete [] imap;
1253   delete [] gmap;
1254 #endif
1255   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1256
1257   CKLOCMGR_LOOP(mgr->doneInserting(););
1258
1259   // _crashedNode = -1;
1260   CpvAccess(_crashedNode) = -1;
1261   inRestarting = 0;
1262 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1263   if (CkMyPe() == 0)
1264     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1265 #else
1266 if(flag == 0)
1267 {
1268     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1269 }
1270 #endif
1271 }
1272
1273 void CkMemCheckPT::gotReply(){
1274     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1275 }
1276
1277 void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1278 #if CMK_CHKP_ALL
1279         PUP::fromMem p(packData);
1280         int numElements;
1281         p|numElements;
1282         if(p.isUnpacking()){
1283                 for(int i=0;i<numElements;i++){
1284                         CkGroupID gID;
1285                         CkArrayIndex idx;
1286                         p|gID;
1287                         p|idx;
1288                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1289                         int homePe = mgr->homePe(idx);
1290 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1291                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1292 #else
1293                         if(CmiNumPartition()==1)
1294                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1295                         else{
1296                                 if(CkMyPe()==thisFailedPe){
1297                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1298                                 }
1299         else{
1300                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
1301                                 }
1302                         }
1303 #endif
1304 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1305                         homePe = mgr->homePe(idx);
1306                         if (homePe != CkMyPe()) {
1307                           gmap[homePe].push_back(gID);
1308                           imap[homePe].push_back(idx);
1309                         }
1310 #endif
1311                 }
1312         }
1313 #endif
1314 }
1315
1316
1317 // on every processor
1318 // turn load balancer back on
1319 void CkMemCheckPT::finishUp()
1320 {
1321   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1322   //CKLOCMGR_LOOP(mgr->doneInserting(););
1323   
1324   if (CkMyPe() == thisFailedPe)
1325   {
1326        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1327        cpCallback.send();
1328        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1329   }
1330         
1331   if(CmiNumPartition()!=1){
1332         CpvAccess(recvdProcChkp) = 0;
1333         CpvAccess(recvdArrayChkp) = 0;
1334         CpvAccess(curPointer)^=1;
1335    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1336          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1337    }
1338         //notify my replica, restart is done
1339    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1340    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1341    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1342   }
1343
1344 #if CK_NO_PROC_POOL
1345 #if NODE_CHECKPOINT
1346   int numnodes = CmiNumPhysicalNodes();
1347 #else
1348   int numnodes = CkNumPes();
1349 #endif
1350   if (numnodes-totalFailed() <=2) {
1351     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1352     _memChkptOn = 0;
1353   }
1354 #endif
1355 }
1356
1357 void CkMemCheckPT::recoverFromSoftFailure()
1358 {
1359         inRestarting = 0;
1360         if(CkMyPe()==0)
1361         cpCallback.send();
1362 }
1363 // called only on 0
1364 void CkMemCheckPT::quiescence(CkCallback &cb)
1365 {
1366   static int pe_count = 0;
1367   pe_count ++;
1368   CmiAssert(CkMyPe() == 0);
1369   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1370   if (pe_count == CkNumPes()) {
1371     pe_count = 0;
1372     cb.send();
1373   }
1374 }
1375
1376 // User callable function - to start a checkpoint
1377 // callback cb is used to pass control back
1378 void CkStartMemCheckpoint(CkCallback &cb)
1379 {
1380 #if CMK_MEM_CHECKPOINT
1381         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1382   if (_memChkptOn == 0) {
1383     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1384     cb.send();
1385     return;
1386   }
1387   if (CkInRestarting()) {
1388       // trying to checkpointing during restart
1389     cb.send();
1390     return;
1391   }
1392     // store user callback and user data
1393   CkMemCheckPT::cpCallback = cb;
1394
1395     // broadcast to start check pointing
1396   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1397   checkptMgr.doItNow(CkMyPe(), cb);
1398 #else
1399   // when mem checkpoint is disabled, invike cb immediately
1400   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1401   cb.send();
1402 #endif
1403 }
1404
1405 void CkRestartCheckPoint(int diePe)
1406 {
1407 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1408   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1409   // broadcast
1410   checkptMgr.restart(diePe);
1411 }
1412
1413 static int _diePE = -1;
1414
1415 // callback function used locally by ccs handler
1416 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1417 {
1418 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1419   CkRestartCheckPoint(_diePE);
1420 }
1421
1422
1423 // called on crashed PE
1424 static void restartBeginHandler(char *msg)
1425 {
1426   CmiFree(msg);
1427 #if CMK_MEM_CHECKPOINT
1428 #if CMK_USE_BARRIER
1429         if(CkMyPe()!=_diePE){
1430                 printf("restar begin on %d\n",CkMyPe());
1431                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1432                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1433                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1434         }else{
1435         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1436         CkRestartCheckPointCallback(NULL, NULL);
1437         }
1438 #else
1439   static int count = 0;
1440   CmiAssert(CkMyPe() == _diePE);
1441   count ++;
1442   if (count == CkNumPes()) {
1443           printf("restart begin on %d\n",CkMyPe());
1444     CkRestartCheckPointCallback(NULL, NULL);
1445     count = 0;
1446   }
1447 #endif
1448 #endif
1449 }
1450
1451 extern void _discard_charm_message();
1452 extern void _resume_charm_message();
1453
1454 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1455         return data;
1456 }
1457
1458 static void restartBcastHandler(char *msg)
1459 {
1460 #if CMK_MEM_CHECKPOINT
1461   // advance phase counter
1462   CkMemCheckPT::inRestarting = 1;
1463   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1464   // gzheng
1465   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1466
1467   if (CkMyPe()==_diePE)
1468     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1469
1470   // reset QD counters
1471 /*  gzheng
1472   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1473 */
1474
1475 /*  gzheng
1476   if (CkMyPe()==_diePE)
1477       CkRestartCheckPointCallback(NULL, NULL);
1478 */
1479   CmiFree(msg);
1480
1481   _resume_charm_message();
1482
1483     // reduction
1484   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1485   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1486 #if CMK_USE_BARRIER
1487         //CmiPrintf("before reduce\n"); 
1488         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1489         //CmiPrintf("after reduce\n");  
1490 #else
1491   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1492 #endif 
1493  checkpointed = 0;
1494 #endif
1495 }
1496
1497 extern void _initDone();
1498
1499 bool compare(char * buf1, char *buf2){
1500         return true;
1501 }
1502
1503 static void recvRemoteChkpHandler(char *msg){
1504    envelope *env = (envelope *)msg;
1505    CkUnpackMessage(&env);
1506    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1507   if(CpvAccess(recvdLocal)==1){
1508           int pointer = CpvAccess(curPointer);
1509           int size = CpvAccess(chkpBuf)[pointer]->len;
1510           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1511           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1512                 
1513                 checkptMgr[CkMyPe()].doneComparison(true);
1514                 }else
1515                 {
1516                         checkptMgr[CkMyPe()].doneComparison(false);
1517                 }
1518   }else{
1519           CpvAccess(recvdRemote) = 1;
1520           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1521           CpvAccess(buddyBuf) = chkpMsg;
1522   }  
1523 }
1524
1525 static void replicaRecoverHandler(char *msg){
1526         CpvAccess(_remoteCrashedNode) = -1;
1527         CkMemCheckPT::replicaAlive = 1;
1528     bool ret = true;
1529         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1530         checkptMgr[CkMyPe()].doneComparison(ret);
1531 }
1532
1533 static void replicaDieHandler(char * msg){
1534         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1535         CpvAccess(_remoteCrashedNode) = diePe;
1536         CkMemCheckPT::replicaAlive = 0;
1537         find_spare_mpirank(diePe,CmiMyPartition()^1);
1538     if(CkMyPe()==diePe){
1539       CkPrintf("pe %d in replicad word die\n",diePe);
1540             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1541     }
1542         //broadcast to my partition
1543         //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1544         //checkptMgr.notifyReplicaDie(diePe);
1545     //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1546     //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1547 }
1548
1549
1550 static void replicaDieBcastHandler(char *msg){
1551         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1552         CpvAccess(_remoteCrashedNode) = diePe;
1553         CkMemCheckPT::replicaAlive = 0;
1554 }
1555
1556 static void recoverRemoteProcDataHandler(char *msg){
1557    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1558    envelope *env = (envelope *)msg;
1559    CkUnpackMessage(&env);
1560    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1561         
1562    //store the checkpoint
1563         int pointer = procMsg->pointer;
1564
1565         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1566         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1567
1568    PUP::fromMem p(procMsg->packData);
1569    _handleProcData(p,CmiTrue);
1570    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1571    CKLOCMGR_LOOP(mgr->startInserting(););
1572    
1573    _initDone();
1574    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1575    CpvAccess(recvdProcChkp) =1;
1576         if(CpvAccess(recvdArrayChkp)==1){
1577                 _resume_charm_message();
1578                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1579                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1580                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1581         }
1582 }
1583
1584 static void recoverRemoteArrayDataHandler(char *msg){
1585   if(CkMyPe()==CpvAccess(_crashedNode)) 
1586   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1587    envelope *env = (envelope *)msg;
1588    CkUnpackMessage(&env);
1589    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1590         
1591    //store the checkpoint
1592         int pointer = chkpMsg->pointer;
1593         CpvAccess(curPointer) = pointer;
1594         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1595         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1596    CpvAccess(recvdArrayChkp) =1;
1597         CkMemCheckPT::inRestarting = 1;
1598         if(CmiMyPe()!=CpvAccess(_crashedNode)||CpvAccess(recvdProcChkp) == 1){
1599                 _resume_charm_message();
1600                 _diePE = CpvAccess(_crashedNode);
1601                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1602                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1603                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1604                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1605         }
1606 }
1607
1608 static void recvPhaseHandler(char * msg)
1609 {
1610         CpvAccess(_curRestartPhase)--;
1611         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1612   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1613   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1614   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1615 //              CmiPrintf("start ping check handler\n");
1616 //       }
1617         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1618   // }
1619    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1620    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1621
1622 }
1623 // called on crashed processor
1624 static void recoverProcDataHandler(char *msg)
1625 {
1626 #if CMK_MEM_CHECKPOINT
1627    int i;
1628    envelope *env = (envelope *)msg;
1629    CkUnpackMessage(&env);
1630    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1631    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1632    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1633    PUP::fromMem p(procMsg->packData);
1634    _handleProcData(p,CmiTrue);
1635
1636    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1637    // gzheng
1638    CKLOCMGR_LOOP(mgr->startInserting(););
1639
1640    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1641    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1642    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1643    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1644
1645    _initDone();
1646 //   CpvAccess(_qd)->flushStates();
1647    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1648 #endif
1649 }
1650 //for replica, got the phase number from my neighbor processor in the current partition
1651 static void askPhaseHandler(char *msg)
1652 {
1653 #if CMK_MEM_CHECKPOINT
1654         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1655     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1656         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1657     CmiSetHandler(msg, recvPhaseHandlerIdx);
1658         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1659 #endif
1660 }
1661 // called on its backup processor
1662 // get backup message buffer and sent to crashed processor
1663 static void askProcDataHandler(char *msg)
1664 {
1665 #if CMK_MEM_CHECKPOINT
1666     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1667     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1668     if (CpvAccess(procChkptBuf) == NULL)  {
1669       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1670       CkAbort("no checkpoint found");
1671     }
1672     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1673     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1674
1675     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1676
1677     CkPackMessage(&env);
1678     CmiSetHandler(env, recoverProcDataHandlerIdx);
1679     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1680     CpvAccess(procChkptBuf) = NULL;
1681     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1682 #endif
1683 }
1684
1685 // called on PE 0
1686 void qd_callback(void *m)
1687 {
1688    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1689    CkFreeMsg(m);
1690 #ifdef CMK_SMP
1691    for(int i=0;i<CmiMyNodeSize();i++){
1692    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1693    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1694         CmiSetHandler(msg, askProcDataHandlerIdx);
1695         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1696         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1697    }
1698    return;
1699 #endif
1700    if(CmiNumPartition()==1){
1701            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1702            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1703            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1704            CmiSetHandler(msg, askProcDataHandlerIdx);
1705            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1706            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1707         }
1708    else{
1709                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1710                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1711                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1712    }
1713 }
1714
1715 // on crashed node
1716 void CkMemRestart(const char *dummy, CkArgMsg *args)
1717 {
1718 #if CMK_MEM_CHECKPOINT
1719    _diePE = CmiMyNode();
1720    CpvAccess( _crashedNode )= CmiMyNode();
1721    CkMemCheckPT::inRestarting = 1;
1722    _discard_charm_message();
1723   
1724   /*if(CmiMyRank()==0){
1725     CkCallback cb(qd_callback);
1726     CkStartQD(cb);
1727     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1728   }*/
1729    if(CmiNumPartition()==1){
1730            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1731   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1732                 restartT = CmiWallTimer();
1733            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1734            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1735            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1736            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1737            CmiSetHandler(msg, askProcDataHandlerIdx);
1738            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1739            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1740    }
1741    else{
1742                 CkCallback cb(qd_callback);
1743                 CkStartQD(cb);
1744         //      CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1745            //get the restart phase
1746            //CmiPrintf("[%d] diePe asks for phase\n",CkMyPe());
1747            //char *phaseMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1748            //*(int *)(phaseMsg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1749            //CmiSetHandler(phaseMsg, askPhaseHandlerIdx);
1750            //int pe = ChkptOnPe(CpvAccess(_crashedNode));
1751            //CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)phaseMsg);
1752                 //notify the other partition
1753            //CkPrintf("[%d] notify remote partition %d\n",CkMyPe(),CmiMyPartition());
1754            //char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1755            //*(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1756            //CmiSetHandler(msg,replicaDieHandlerIdx);
1757            //CmiRemoteSyncSendAndFree(CkMyPe(),0,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
1758    }
1759 #else
1760    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1761 #endif
1762 }
1763
1764 // can be called in other files
1765 // return true if it is in restarting
1766 extern "C"
1767 int CkInRestarting()
1768 {
1769 #if CMK_MEM_CHECKPOINT
1770   if (CpvAccess( _crashedNode)!=-1) return 1;
1771   // gzheng
1772   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1773   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1774   return CkMemCheckPT::inRestarting;
1775 #else
1776   return 0;
1777 #endif
1778 }
1779
1780 extern "C"
1781 int CkReplicaAlive()
1782 {
1783         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1784           CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1785         return CkMemCheckPT::replicaAlive;
1786
1787         /*if(CkMemCheckPT::replicaDead==1)
1788                 return 0;
1789         else            
1790                 return 1;*/
1791 }
1792
1793 extern "C"
1794 void CkSetInLdb(){
1795 #if CMK_MEM_CHECKPOINT
1796         CkMemCheckPT::inLoadbalancing = 1;
1797 #endif
1798 }
1799
1800 extern "C"
1801 int CkInLdb(){
1802 #if CMK_MEM_CHECKPOINT
1803         return CkMemCheckPT::inLoadbalancing;
1804 #endif
1805         return 0;
1806 }
1807
1808 extern "C"
1809 void CkResetInLdb(){
1810 #if CMK_MEM_CHECKPOINT
1811         CkMemCheckPT::inLoadbalancing = 0;
1812 #endif
1813 }
1814
1815 /*****************************************************************************
1816                 module initialization
1817 *****************************************************************************/
1818
1819 static int arg_where = CkCheckPoint_inMEM;
1820
1821 #if CMK_MEM_CHECKPOINT
1822 void init_memcheckpt(char **argv)
1823 {
1824     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1825       arg_where = CkCheckPoint_inDISK;
1826     }
1827
1828         // initiliazing _crashedNode variable
1829         CpvInitialize(int, _crashedNode);
1830         CpvInitialize(int, _remoteCrashedNode);
1831         CpvAccess(_crashedNode) = -1;
1832         CpvAccess(_remoteCrashedNode) = -1;
1833 }
1834 #endif
1835
1836 class CkMemCheckPTInit: public Chare {
1837 public:
1838   CkMemCheckPTInit(CkArgMsg *m) {
1839 #if CMK_MEM_CHECKPOINT
1840     if (arg_where == CkCheckPoint_inDISK) {
1841       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1842     }
1843     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1844     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1845 #endif
1846   }
1847 };
1848
1849 static void notifyHandler(char *msg)
1850 {
1851 #if CMK_MEM_CHECKPOINT
1852   CmiFree(msg);
1853       /* immediately increase restart phase to filter old messages */
1854   CpvAccess(_curRestartPhase) ++;
1855   CpvAccess(_qd)->flushStates();
1856   _discard_charm_message();
1857
1858 #endif
1859 }
1860
1861 extern "C"
1862 void notify_crash(int node)
1863 {
1864 #ifdef CMK_MEM_CHECKPOINT
1865   CpvAccess( _crashedNode) = node;
1866 #ifdef CMK_SMP
1867   for(int i=0;i<CkMyNodeSize();i++){
1868         CpvAccessOther(_crashedNode,i)=node;
1869   }
1870 #endif
1871   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1872   CkMemCheckPT::inRestarting = 1;
1873
1874     // this may be in interrupt handler, send a message to reset QD
1875   int pe = CmiNodeFirst(CkMyNode());
1876   for(int i=0;i<CkMyNodeSize();i++){
1877         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1878         CmiSetHandler(msg, notifyHandlerIdx);
1879         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1880   }
1881 #endif
1882 }
1883
1884 #if CMK_CONVERSE_MPI
1885 extern "C" void (*notify_crash_fn)(int node);
1886
1887
1888 void buddyDieHandler(char *msg)
1889 {
1890 #if CMK_MEM_CHECKPOINT
1891    // notify
1892    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1893    notify_crash(diepe);
1894    // send message to crash pe to let it restart
1895    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1896    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1897    int buddy = obj->BuddyPE(CmiMyPe());
1898    if (buddy == diepe)  {
1899    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1900      CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1901      mpi_restart_crashed(diepe, newrank);
1902    }
1903 #endif
1904 }
1905
1906 void pingHandler(void *msg)
1907 {
1908   lastPingTime = CmiWallTimer();
1909   CmiFree(msg);
1910 }
1911
1912 void pingCheckHandler()
1913 {
1914 #if CMK_MEM_CHECKPOINT
1915   double now = CmiWallTimer();
1916   if (lastPingTime > 0 && now - lastPingTime > 1 && !CkInLdb()) {
1917     int i, pe, buddy;
1918     // tell everyone the buddy dies
1919     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1920     for (i = 1; i < CmiNumPes(); i++) {
1921        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1922        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1923     }
1924     buddy = pe;
1925     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1926     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1927       if (obj->isFailed(pe) || pe == buddy) continue;
1928       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1929       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1930       CmiSetHandler(msg, buddyDieHandlerIdx);
1931       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1932     }*/
1933     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1934     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1935     CmiSetHandler(msg, buddyDieHandlerIdx);
1936     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1937     //send to everyone in the other world
1938     for(int i=0;i<CmiNumPes();i++){
1939       char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1940       *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
1941       CmiSetHandler(rMsg, replicaDieHandlerIdx);
1942       CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
1943     }
1944   }
1945   else 
1946     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1947 #endif
1948 }
1949
1950 void pingBuddy()
1951 {
1952 #if CMK_MEM_CHECKPOINT
1953   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1954   if (obj) {
1955     int buddy = obj->BuddyPE(CkMyPe());
1956     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1957     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1958     CmiSetHandler(msg, pingHandlerIdx);
1959     CmiGetRestartPhase(msg) = 9999;
1960     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1961   }
1962   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1963 #endif
1964 }
1965 #endif
1966
1967 // initproc
1968 void CkRegisterRestartHandler( )
1969 {
1970 #if CMK_MEM_CHECKPOINT
1971   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1972   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1973   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1974   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1975   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1976   
1977   //for replica
1978   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
1979   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
1980   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
1981   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
1982   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
1983   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
1984   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
1985   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
1986
1987 #if CMK_CONVERSE_MPI
1988   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1989   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1990   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1991 #endif
1992
1993   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1994   CpvInitialize(CkCheckPTMessage **, chkpBuf);
1995   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
1996   CpvInitialize(CkCheckPTMessage *, buddyBuf);
1997   CpvInitialize(int,curPointer);
1998   CpvInitialize(int,recvdLocal);
1999   CpvInitialize(int,recvdRemote);
2000   CpvInitialize(int,recvdProcChkp);
2001   CpvInitialize(int,recvdArrayChkp);
2002
2003   CpvAccess(procChkptBuf) = NULL;
2004   CpvAccess(buddyBuf) = NULL;
2005   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2006   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2007   CpvAccess(chkpBuf)[0] = NULL;
2008   CpvAccess(chkpBuf)[1] = NULL;
2009   CpvAccess(localProcChkpBuf)[0] = NULL;
2010   CpvAccess(localProcChkpBuf)[1] = NULL;
2011   
2012   CpvAccess(curPointer) = 0;
2013   CpvAccess(recvdLocal) = 0;
2014   CpvAccess(recvdRemote) = 0;
2015   CpvAccess(recvdProcChkp) = 0;
2016   CpvAccess(recvdArrayChkp) = 0;
2017
2018   notify_crash_fn = notify_crash;
2019
2020 #if ! CMK_CONVERSE_MPI
2021   // print pid to kill
2022 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2023 //  sleep(4);
2024 #endif
2025 #endif
2026 }
2027
2028
2029 extern "C"
2030 int CkHasCheckpoints()
2031 {
2032   return checkpointed;
2033 }
2034
2035 /// @todo: the following definitions should be moved to a separate file containing
2036 // structures and functions about fault tolerance strategies
2037
2038 /**
2039  *  * @brief: function for killing a process                                             
2040  *   */
2041 #ifdef CMK_MEM_CHECKPOINT
2042 #if CMK_HAS_GETPID
2043 void killLocal(void *_dummy,double curWallTime){
2044         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2045         if(CmiWallTimer()<killTime-1){
2046                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2047         }else{ 
2048 #if CMK_CONVERSE_MPI
2049                                 CkDieNow();
2050 #else 
2051                 kill(getpid(),SIGKILL);                                               
2052 #endif
2053         }              
2054
2055 #else
2056 void killLocal(void *_dummy,double curWallTime){
2057   CmiAbort("kill() not supported!");
2058 }
2059 #endif
2060 #endif
2061
2062 #ifdef CMK_MEM_CHECKPOINT
2063 /**
2064  * @brief: reads the file with the kill information
2065  */
2066 void readKillFile(){
2067         FILE *fp=fopen(killFile,"r");
2068         if(!fp){
2069                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2070                 return;
2071         }
2072         int proc;
2073         double sec;
2074         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2075                 if(proc == CkMyNode() && CkMyRank() == 0){
2076                         killTime = CmiWallTimer()+sec;
2077                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2078                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2079                 }
2080         }
2081         fclose(fp);
2082 }
2083
2084 #if ! CMK_CONVERSE_MPI
2085 void CkDieNow()
2086 {
2087 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2088          // ignored for non-mpi version
2089         CmiPrintf("[%d] die now.\n", CmiMyPe());
2090         killTime = CmiWallTimer()+0.001;
2091         CcdCallFnAfter(killLocal,NULL,1);
2092 #endif
2093 }
2094 #endif
2095
2096 #endif
2097
2098 #include "CkMemCheckpoint.def.h"
2099
2100