support for lb
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74 CpvDeclare(int, _remoteCrashedNode);
75
76 // static, so that it is accessible from Converse part
77 int CkMemCheckPT::inRestarting = 0;
78 int CkMemCheckPT::inCheckpointing = 0;
79 int CkMemCheckPT::replicaAlive = 1;
80 int CkMemCheckPT::inLoadbalancing = 0;
81 double CkMemCheckPT::startTime;
82 char *CkMemCheckPT::stage;
83 CkCallback CkMemCheckPT::cpCallback;
84
85 int _memChkptOn = 1;                    // checkpoint is on or off
86
87 CkGroupID ckCheckPTGroupID;             // readonly
88
89 static int checkpointed = 0;
90
91 /// @todo the following declarations should be moved into a separate file for all 
92 // fault tolerant strategies
93
94 #ifdef CMK_MEM_CHECKPOINT
95 // name of the kill file that contains processes to be killed 
96 char *killFile;                                               
97 // flag for the kill file         
98 int killFlag=0;
99 // variable for storing the killing time
100 double killTime=0.0;
101 #endif
102
103 #ifdef CKLOCMGR_LOOP
104 #undef CKLOCMGR_LOOP
105 #endif
106 // loop over all CkLocMgr and do "code"
107 #define  CKLOCMGR_LOOP(code)    {       \
108   int numGroups = CkpvAccess(_groupIDTable)->size();    \
109   for(int i=0;i<numGroups;i++) {        \
110     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
111     if(obj->isLocMgr())  {      \
112       CkLocMgr *mgr = (CkLocMgr*)obj;   \
113       code      \
114     }   \
115   }     \
116  }
117
118 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
119 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
120 CpvDeclare(CkCheckPTMessage**, chkpBuf);
121 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
122 //store the checkpoint of the buddy to compare
123 //do not need the whole msg, can be the checksum
124 CpvDeclare(CkCheckPTMessage*, buddyBuf);
125 //pointer of the checkpoint going to be written
126 CpvDeclare(int, curPointer);
127 CpvDeclare(int, recvdRemote);
128 CpvDeclare(int, recvdLocal);
129 CpvDeclare(int, recvdArrayChkp);
130 CpvDeclare(int, recvdProcChkp);
131
132 bool compare(char * buf1, char * buf2);
133 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
134 // Converse function handles
135 static int askPhaseHandlerIdx;
136 static int recvPhaseHandlerIdx;
137 static int askProcDataHandlerIdx;
138 static int restartBcastHandlerIdx;
139 static int recoverProcDataHandlerIdx;
140 static int restartBeginHandlerIdx;
141 static int recvRemoteChkpHandlerIdx;
142 static int replicaDieHandlerIdx;
143 static int replicaDieBcastHandlerIdx;
144 static int replicaRecoverHandlerIdx;
145 static int recoverRemoteProcDataHandlerIdx;
146 static int recoverRemoteArrayDataHandlerIdx;
147 static int notifyHandlerIdx;
148 // compute the backup processor
149 // FIXME: avoid crashed processors
150 #if CMK_CONVERSE_MPI
151 static int pingHandlerIdx;
152 static int pingCheckHandlerIdx;
153 static int buddyDieHandlerIdx;
154 static double lastPingTime = -1;
155
156 extern "C" void mpi_restart_crashed(int pe, int rank);
157 extern "C" int  find_spare_mpirank(int pe,int partition);
158
159 void pingBuddy();
160 void pingCheckHandler();
161
162 #endif
163 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
164
165 inline int CkMemCheckPT::BuddyPE(int pe)
166 {
167   int budpe;
168 #if NODE_CHECKPOINT
169     // buddy is the processor with same rank on the next physical node
170   int r1 = CmiPhysicalRank(pe);
171   int budnode = CmiPhysicalNodeID(pe);
172   do {
173     budnode = (budnode+1)%CmiNumPhysicalNodes();
174     int *pelist;
175     int num;
176     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
177     budpe = pelist[r1 % num];
178   } while (isFailed(budpe));
179   if (budpe == pe) {
180     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
181     CmiAbort("Failed to find a buddy processor");
182   }
183 #else
184   budpe = pe;
185   while (budpe == pe || isFailed(budpe)) 
186           budpe = (budpe+1)%CkNumPes();
187 #endif
188   return budpe;
189 }
190
191 // called in array element constructor
192 // choose and register with 2 buddies for checkpoiting 
193 #if CMK_MEM_CHECKPOINT
194 void ArrayElement::init_checkpt() {
195         if (_memChkptOn == 0) return;
196         if (CkInRestarting()) {
197           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
198         }
199         // only master init checkpoint
200         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
201
202         budPEs[0] = CkMyPe();
203         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
204         CmiAssert(budPEs[0] != budPEs[1]);
205         // inform checkPTMgr
206         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
207         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
208         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
209         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
210 }
211 #endif
212
213 // entry function invoked by checkpoint mgr asking for checkpoint data
214 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
215 #if CMK_MEM_CHECKPOINT
216 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
217 //char index[128];   thisIndexMax.sprint(index);
218 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
219   CkLocMgr *locMgr = thisArray->getLocMgr();
220   CmiAssert(myRec!=NULL);
221   int size;
222   {
223         PUP::sizer p;
224         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
225         size = p.size();
226   }
227   int packSize = size/sizeof(double) +1;
228   CkArrayCheckPTMessage *msg =
229                  new (packSize, 0) CkArrayCheckPTMessage;
230   msg->len = size;
231   msg->index =thisIndexMax;
232   msg->aid = thisArrayID;
233   msg->locMgr = locMgr->getGroupID();
234   msg->cp_flag = 1;
235   {
236         PUP::toMem p(msg->packData);
237         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
238   }
239
240   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
241   checkptMgr.recvData(msg, 2, budPEs);
242   delete m;
243 #endif
244 }
245
246 // checkpoint holder class - for memory checkpointing
247 class CkMemCheckPTInfo: public CkCheckPTInfo
248 {
249   CkArrayCheckPTMessage *ckBuffer;
250 public:
251   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
252                     CkCheckPTInfo(a, loc, idx, pno)
253   {
254     ckBuffer = NULL;
255   }
256   ~CkMemCheckPTInfo() 
257   {
258     if (ckBuffer) delete ckBuffer; 
259   }
260   inline void updateBuffer(CkArrayCheckPTMessage *data) 
261   {
262     CmiAssert(data!=NULL);
263     if (ckBuffer) delete ckBuffer;
264     ckBuffer = data;
265   }    
266   inline CkArrayCheckPTMessage * getCopy()
267   {
268     if (ckBuffer == NULL) {
269       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
270       CmiAbort("Abort!");
271     }
272     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
273   }     
274   inline void updateBuddy(int b1, int b2) {
275      CmiAssert(ckBuffer);
276      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
277      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
278      CmiAssert(pNo != CkMyPe());
279   }
280   inline int getSize() { 
281      CmiAssert(ckBuffer);
282      return ckBuffer->len; 
283   }
284 };
285
286 // checkpoint holder class - for in-disk checkpointing
287 class CkDiskCheckPTInfo: public CkCheckPTInfo 
288 {
289   char *fname;
290   int bud1, bud2;
291   int len;                      // checkpoint size
292 public:
293   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
294   {
295 #if CMK_USE_MKSTEMP
296     fname = new char[64];
297     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
298     mkstemp(fname);
299 #else
300     fname=tmpnam(NULL);
301 #endif
302     bud1 = bud2 = -1;
303     len = 0;
304   }
305   ~CkDiskCheckPTInfo() 
306   {
307     remove(fname);
308   }
309   inline void updateBuffer(CkArrayCheckPTMessage *data) 
310   {
311     double t = CmiWallTimer();
312     // unpack it
313     envelope *env = UsrToEnv(data);
314     CkUnpackMessage(&env);
315     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
316     FILE *f = fopen(fname,"wb");
317     PUP::toDisk p(f);
318     CkPupMessage(p, (void **)&data);
319     // delay sync to the end because otherwise the messages are blocked
320 //    fsync(fileno(f));
321     fclose(f);
322     bud1 = data->bud1;
323     bud2 = data->bud2;
324     len = data->len;
325     delete data;
326     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
327   }
328   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
329   {
330     CkArrayCheckPTMessage *data;
331     FILE *f = fopen(fname,"rb");
332     PUP::fromDisk p(f);
333     CkPupMessage(p, (void **)&data);
334     fclose(f);
335     data->bud1 = bud1;                          // update the buddies
336     data->bud2 = bud2;
337     return data;
338   }
339   inline void updateBuddy(int b1, int b2) {
340      bud1 = b1; bud2 = b2;
341      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
342      CmiAssert(pNo != CkMyPe());
343   }
344   inline int getSize() { 
345      return len; 
346   }
347 };
348
349 CkMemCheckPT::CkMemCheckPT(int w)
350 {
351   int numnodes = 0;
352 #if NODE_CHECKPOINT
353   numnodes = CmiNumPhysicalNodes();
354 #else
355   numnodes = CkNumPes();
356 #endif
357 #if CK_NO_PROC_POOL
358   if (numnodes <= 2)
359 #else
360   if (numnodes  == 1)
361 #endif
362   {
363     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
364     _memChkptOn = 0;
365   }
366   inRestarting = 0;
367   inCheckpointing = 0;
368   recvCount = peCount = 0;
369   ackCount = 0;
370   expectCount = -1;
371   where = w;
372   replicaAlive = 1;
373 #if CMK_CONVERSE_MPI
374   void pingBuddy();
375   void pingCheckHandler();
376   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
377   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
378 #endif
379   chkpTable[0] = NULL;
380   chkpTable[1] = NULL;
381 }
382
383 CkMemCheckPT::~CkMemCheckPT()
384 {
385   int len = ckTable.length();
386   for (int i=0; i<len; i++) {
387     delete ckTable[i];
388   }
389 }
390
391 void CkMemCheckPT::pup(PUP::er& p) 
392
393   CBase_CkMemCheckPT::pup(p); 
394   p|cpStarter;
395   p|thisFailedPe;
396   p|failedPes;
397   p|ckCheckPTGroupID;           // recover global variable
398   p|cpCallback;                 // store callback
399   p|where;                      // where to checkpoint
400   p|peCount;
401   if (p.isUnpacking()) {
402     recvCount = 0;
403 #if CMK_CONVERSE_MPI
404     void pingBuddy();
405     void pingCheckHandler();
406     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
407     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
408 #endif
409   }
410 }
411
412 // called by checkpoint mgr to restore an array element
413 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
414 {
415 #if CMK_MEM_CHECKPOINT
416   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
417   // m->index.print();
418   PUP::fromMem p(m->packData);
419   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
420   CmiAssert(mgr);
421 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
422   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
423 #else
424   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
425 #endif
426
427   // find a list of array elements bound together
428   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
429   CmiAssert(elt);
430   CkLocRec_local *rec = elt->myRec;
431   CkVec<CkMigratable *> list;
432   mgr->migratableList(rec, list);
433   CmiAssert(list.length() > 0);
434   for (int l=0; l<list.length(); l++) {
435     elt = (ArrayElement *)list[l];
436     elt->budPEs[0] = m->bud1;
437     elt->budPEs[1] = m->bud2;
438     //    reset, may not needed now
439     // for now.
440     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
441       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
442       if (c) c->redNo = 0;
443     }
444   }
445 #endif
446 }
447
448 // return 1 if pe is a crashed processor
449 int CkMemCheckPT::isFailed(int pe)
450 {
451   for (int i=0; i<failedPes.length(); i++)
452     if (failedPes[i] == pe) return 1;
453   return 0;
454 }
455
456 // add pe into history list of all failed processors
457 void CkMemCheckPT::failed(int pe)
458 {
459   if (isFailed(pe)) return;
460   failedPes.push_back(pe);
461 }
462
463 int CkMemCheckPT::totalFailed()
464 {
465   return failedPes.length();
466 }
467
468 // create an checkpoint entry for array element of aid with index.
469 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
470 {
471   // error check, no duplicate
472   int idx, len = ckTable.size();
473   for (idx=0; idx<len; idx++) {
474     CkCheckPTInfo *entry = ckTable[idx];
475     if (index == entry->index) {
476       if (loc == entry->locMgr) {
477           // bindTo array elements
478           return;
479       }
480         // for array inheritance, the following check may fail
481         // because ArrayElement constructor of all superclasses are called
482       if (aid == entry->aid) {
483         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
484         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
485       }
486     }
487   }
488   CkCheckPTInfo *newEntry;
489   if (where == CkCheckPoint_inMEM)
490     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
491   else
492     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
493   ckTable.push_back(newEntry);
494   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
495 }
496
497 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
498 {
499 #if !CMK_CHKP_ALL       
500   int buddy = msg->bud1;
501   if (buddy == CkMyPe()) buddy = msg->bud2;
502   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
503   recvData(msg);
504     // ack
505   thisProxy[buddy].gotData();
506 #else
507   chkpTable[0] = NULL;
508   chkpTable[1] = NULL;
509   recvArrayCheckpoint(msg);
510   thisProxy[msg->bud2].gotData();
511 #endif
512 }
513
514 // loop through my checkpoint table and ask checkpointed array elements
515 // to send me checkpoint data.
516 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
517 {
518   checkpointed = 1;
519   cpCallback = cb;
520   cpStarter = starter;
521   inCheckpointing = 1;
522   if (CkMyPe() == cpStarter) {
523     startTime = CmiWallTimer();
524     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
525   }
526 #if CMK_CONVERSE_MPI
527   if(CmiNumPartition()==1)
528 #endif    
529   {
530
531 #if !CMK_CHKP_ALL
532   int len = ckTable.length();
533   for (int i=0; i<len; i++) {
534     CkCheckPTInfo *entry = ckTable[i];
535       // always let the bigger number processor send request
536     //if (CkMyPe() < entry->pNo) continue;
537       // always let the smaller number processor send request, may on same proc
538     if (!isMaster(entry->pNo)) continue;
539       // call inmem_checkpoint to the array element, ask it to send
540       // back checkpoint data via recvData().
541     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
542     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
543   }
544     // if my table is empty, then I am done
545   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
546 #else
547   startArrayCheckpoint();
548 #endif
549         sendProcData();
550   }
551 #if CMK_CONVERSE_MPI
552   else
553   {
554         startCheckpoint();
555   }
556 #endif
557   // pack and send proc level data
558 }
559
560 class MemElementPacker : public CkLocIterator{
561         private:
562                 CkLocMgr *locMgr;
563                 PUP::er &p;
564         public:
565                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
566                 void addLocation(CkLocation &loc){
567                         CkArrayIndexMax idx = loc.getIndex();
568                         CkGroupID gID = locMgr->ckGetGroupID();
569                         p|gID;
570                         p|idx;
571                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
572                 }
573 };
574
575 void pupAllElements(PUP::er &p){
576 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
577         int numElements;
578         if(!p.isUnpacking()){
579                 numElements = CkCountArrayElements();
580         }
581         p | numElements;
582         if(!p.isUnpacking()){
583                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
584         }
585 #endif
586 }
587
588 void CkMemCheckPT::startArrayCheckpoint(){
589 #if CMK_CHKP_ALL
590         int size;
591         {
592                 PUP::sizer psizer;
593                 pupAllElements(psizer);
594                 size = psizer.size();
595         }
596         int packSize = size/sizeof(double)+1;
597  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
598         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
599         msg->len = size;
600         msg->cp_flag = 1;
601         int budPEs[2];
602         msg->bud1=CkMyPe();
603         msg->bud2=ChkptOnPe(CkMyPe());
604         {
605                 PUP::toMem p(msg->packData);
606                 pupAllElements(p);
607         }
608         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
609         if(chkpTable[0]) delete chkpTable[0];
610         chkpTable[0] = msg;
611         //send the checkpoint to my 
612         recvCount++;
613 #endif
614 }
615
616 void CkMemCheckPT::startCheckpoint(){
617 #if CMK_CONVERSE_MPI
618         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
619     CkPrintf("in start checkpointing!!!!\n");
620   int size;
621   {
622     PUP::sizer p;
623     _handleProcData(p,CmiFalse);
624     size = p.size();
625   }
626         int packSize = size/sizeof(double)+1;
627         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
628         procMsg->len = size;
629         procMsg->cp_flag = 1;
630         {
631                 PUP::toMem p(procMsg->packData);
632                 _handleProcData(p,CmiFalse);
633         }
634         int pointer = CpvAccess(curPointer);
635         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
636                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
637         
638         {
639                 PUP::sizer psizer;
640                 pupAllElements(psizer);
641                 size = psizer.size();
642         }
643         packSize = size/sizeof(double)+1;
644         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
645         msg->len = size;
646         msg->cp_flag = 1;
647         {
648                 PUP::toMem p(msg->packData);
649                 pupAllElements(p);
650         }
651         pointer = CpvAccess(curPointer);
652         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
653     CkPrintf("start checkpointing!!!!\n");
654         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
655                 CpvAccess(chkpBuf)[pointer] = msg;
656         if(CkReplicaAlive()==1){
657                 CpvAccess(recvdLocal) = 1;
658                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
659                 CkPackMessage(&env);
660                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
661                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
662         }
663         if(CpvAccess(recvdRemote)==1){
664                 //compare the checkpoint 
665           int size = CpvAccess(chkpBuf)[pointer]->len;
666           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
667                 thisProxy[CkMyPe()].doneComparison(true);
668           }else{
669                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
670                 thisProxy[CkMyPe()].doneComparison(false);
671           }
672   }
673         else{
674                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
675                         {       
676                                 int pointer = CpvAccess(curPointer);
677                                 //send the proc data
678                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
679                                 procMsg->pointer = pointer;
680                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
681                                 CkPackMessage(&env);
682                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
683                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
684                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
685                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
686                                 }
687                         }
688                         //send the array checkpoint data
689                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
690                         msg->pointer = CpvAccess(curPointer);
691                         envelope * env = (envelope *)(UsrToEnv(msg));
692                         CkPackMessage(&env);
693                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
694                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
695                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
696                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
697                         //can continue work, no need to wait for my replica
698                 }
699         }
700 #endif
701 }
702
703 void CkMemCheckPT::doneComparison(bool ret){
704         int _ret;
705         if(!ret){
706         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
707                 _ret = 0;
708         }else{
709                 _ret = 1;
710         }
711         inCheckpointing = 0;
712         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
713         contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
714 }
715
716 void CkMemCheckPT::doneRComparison(int ret){
717         CpvAccess(recvdRemote) = 0;
718         CpvAccess(recvdLocal) = 0;
719 //      if(CpvAccess(curPointer) == 0){
720         if(ret==1){
721         CpvAccess(curPointer)^=1;
722                 if(CkMyPe() == 0){
723                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
724                         cpCallback.send();
725                 }
726         }else{
727         CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
728                 RollBack();
729         }
730 }
731
732 void CkMemCheckPT::RollBack(){
733         //restore group data
734         checkpointed = 0;
735         CkMemCheckPT::inRestarting = 1;
736         int pointer = CpvAccess(curPointer)^1;//use the previous one
737     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
738         PUP::fromMem p(chkpMsg->packData);      
739         //_handleProcData(p);
740         
741         //destroy array elements
742         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
743           int numGroups = CkpvAccess(_groupIDTable)->size();
744           for(int i=0;i<numGroups;i++) {
745                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
746                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
747                 obj->flushStates();
748                 obj->ckJustMigrated();
749           }
750         //restore array elements
751         
752         int numElements;
753         p|numElements;
754         
755         if(p.isUnpacking()){
756                 for(int i=0;i<numElements;i++){
757                 //for(int i=0;i<1;i++){
758                         CkGroupID gID;
759                         CkArrayIndex idx;
760                         p|gID;
761                         p|idx;
762                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
763                         CmiAssert(mgr);
764                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
765                 }
766         }
767         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
768         contribute(cb);
769 }
770
771 void CkMemCheckPT::notifyReplicaDie(int pe){
772         CkPrintf("[%d] receive replica die\n",CkMyPe());
773         replicaAlive = 0;
774         CpvAccess(_remoteCrashedNode) = pe;
775 }
776
777 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
778 {
779 #if CMK_CHKP_ALL
780         int idx = 1;
781         if(msg->bud1 == CkMyPe()){
782                 idx = 0;
783         }
784         int isChkpting = msg->cp_flag;
785         if(isChkpting == 1){
786                 if(chkpTable[idx]) delete chkpTable[idx];
787         }
788         chkpTable[idx] = msg;
789         if(isChkpting){
790                 recvCount++;
791                 if(recvCount == 2){
792                   if (where == CkCheckPoint_inMEM) {
793                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
794                   }
795                   else if (where == CkCheckPoint_inDISK) {
796                         // another barrier for finalize the writing using fsync
797                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
798                         contribute(0,NULL,CkReduction::sum_int,localcb);
799                   }
800                   else
801                         CmiAbort("Unknown checkpoint scheme");
802                   recvCount = 0;
803                 }
804         }
805 #endif
806 }
807
808 // don't handle array elements
809 static inline void _handleProcData(PUP::er &p, CmiBool create)
810 {
811     // save readonlys, and callback BTW
812     CkPupROData(p);
813
814     // save mainchares 
815     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
816         
817 #ifndef CMK_CHARE_USE_PTR
818     // save non-migratable chare
819     CkPupChareData(p);
820 #endif
821
822     // save groups into Groups.dat
823     CkPupGroupData(p,create);
824
825     // save nodegroups into NodeGroups.dat
826     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
827 }
828
829 void CkMemCheckPT::sendProcData()
830 {
831   // find out size of buffer
832   int size;
833   {
834     PUP::sizer p;
835     _handleProcData(p,CmiTrue);
836     size = p.size();
837   }
838   int packSize = size;
839   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
840   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
841   {
842     PUP::toMem p(msg->packData);
843     _handleProcData(p,CmiTrue);
844   }
845   msg->pe = CkMyPe();
846   msg->len = size;
847   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
848   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
849 }
850
851 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
852 {
853   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
854   CpvAccess(procChkptBuf) = msg;
855   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
856   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
857 }
858
859 // ArrayElement call this function to give us the checkpointed data
860 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
861 {
862   int len = ckTable.length();
863   int idx;
864   for (idx=0; idx<len; idx++) {
865     CkCheckPTInfo *entry = ckTable[idx];
866     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
867   }
868   CkAssert(idx < len);
869   int isChkpting = msg->cp_flag;
870   ckTable[idx]->updateBuffer(msg);
871   if (isChkpting) {
872       // all my array elements have returned their inmem data
873       // inform starter processor that I am done.
874     recvCount ++;
875     if (recvCount == ckTable.length()) {
876       if (where == CkCheckPoint_inMEM) {
877         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
878       }
879       else if (where == CkCheckPoint_inDISK) {
880         // another barrier for finalize the writing using fsync
881         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
882         contribute(0,NULL,CkReduction::sum_int,localcb);
883       }
884       else
885         CmiAbort("Unknown checkpoint scheme");
886       recvCount = 0;
887     } 
888   }
889 }
890
891 // only used in disk checkpointing
892 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
893 {
894   delete m;
895 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
896   system("sync");
897 #endif
898   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
899 }
900
901 // only is called on cpStarter when checkpoint is done
902 void CkMemCheckPT::cpFinish()
903 {
904   CmiAssert(CkMyPe() == cpStarter);
905   peCount++;
906     // now that all processors have finished, activate callback
907   if (peCount == 2) 
908 {
909     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
910     cpCallback.send();
911     peCount = 0;
912     thisProxy.report();
913   }
914 }
915
916 // for debugging, report checkpoint info
917 void CkMemCheckPT::report()
918 {
919         inCheckpointing = 0;
920 #if !CMK_CHKP_ALL
921   int objsize = 0;
922   int len = ckTable.length();
923   for (int i=0; i<len; i++) {
924     CkCheckPTInfo *entry = ckTable[i];
925     CmiAssert(entry);
926     objsize += entry->getSize();
927   }
928   CmiAssert(CpvAccess(procChkptBuf));
929   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
930 #else
931   if(CkMyPe()==0)
932   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
933 #endif
934 }
935
936 /*****************************************************************************
937                         RESTART Procedure
938 *****************************************************************************/
939
940 // master processor of two buddies
941 inline int CkMemCheckPT::isMaster(int buddype)
942 {
943 #if 0
944   int mype = CkMyPe();
945 //CkPrintf("ismaster: %d %d\n", pe, mype);
946   if (CkNumPes() - totalFailed() == 2) {
947     return mype > buddype;
948   }
949   for (int i=1; i<CkNumPes(); i++) {
950     int me = (buddype+i)%CkNumPes();
951     if (isFailed(me)) continue;
952     if (me == mype) return 1;
953     else return 0;
954   }
955   return 0;
956 #else
957     // smaller one
958   int mype = CkMyPe();
959 //CkPrintf("ismaster: %d %d\n", pe, mype);
960   if (CkNumPes() - totalFailed() == 2) {
961     return mype < buddype;
962   }
963 #if NODE_CHECKPOINT
964   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
965   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
966 #else
967   for (int i=1; i<CkNumPes(); i++) {
968 #endif
969     int me = (mype+i)%CkNumPes();
970     if (isFailed(me)) continue;
971     if (me == buddype) return 1;
972     else return 0;
973   }
974   return 0;
975 #endif
976 }
977
978
979
980 #if 0
981 // helper class to pup all elements that belong to same ckLocMgr
982 class ElementDestoryer : public CkLocIterator {
983 private:
984         CkLocMgr *locMgr;
985 public:
986         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
987         void addLocation(CkLocation &loc) {
988                 CkArrayIndex idx=loc.getIndex();
989                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
990                 loc.destroy();
991         }
992 };
993 #endif
994
995 // restore the bitmap vector for LB
996 void CkMemCheckPT::resetLB(int diepe)
997 {
998 #if CMK_LBDB_ON
999   int i;
1000   char *bitmap = new char[CkNumPes()];
1001   // set processor available bitmap
1002   get_avail_vector(bitmap);
1003
1004   for (i=0; i<failedPes.length(); i++)
1005     bitmap[failedPes[i]] = 0; 
1006   bitmap[diepe] = 0;
1007
1008 #if CK_NO_PROC_POOL
1009   set_avail_vector(bitmap);
1010 #endif
1011
1012   // if I am the crashed pe, rebuild my failedPEs array
1013   if (CkMyNode() == diepe)
1014     for (i=0; i<CkNumPes(); i++) 
1015       if (bitmap[i]==0) failed(i);
1016
1017   delete [] bitmap;
1018 #endif
1019 }
1020
1021 static double restartT;
1022
1023 // in case when failedPe dies, everybody go through its checkpoint table:
1024 // destory all array elements
1025 // recover lost buddies
1026 // reconstruct all array elements from check point data
1027 // called on all processors
1028 void CkMemCheckPT::restart(int diePe)
1029 {
1030 #if CMK_MEM_CHECKPOINT
1031   double curTime = CmiWallTimer();
1032   if (CkMyPe() == diePe){
1033            restartT = CmiWallTimer();
1034     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1035   }
1036   stage = (char*)"resetLB";
1037   startTime = curTime;
1038   if (CkMyPe() == diePe)
1039     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1040
1041 #if CK_NO_PROC_POOL
1042   failed(diePe);        // add into the list of failed pes
1043 #endif
1044   thisFailedPe = diePe;
1045
1046   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1047
1048   inRestarting = 1;
1049                                                                                 
1050   // disable load balancer's barrier
1051   //if (CkMyPe() != diePe) resetLB(diePe);
1052
1053   CKLOCMGR_LOOP(mgr->startInserting(););
1054
1055
1056   if(CmiNumPartition()==1){
1057         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1058   }else{
1059         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1060         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1061   }
1062 #endif
1063 }
1064
1065 // loally remove all array elements
1066 void CkMemCheckPT::removeArrayElements()
1067 {
1068 #if CMK_MEM_CHECKPOINT
1069   int len = ckTable.length();
1070   double curTime = CmiWallTimer();
1071   if (CkMyPe() == thisFailedPe) 
1072     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1073   stage = (char*)"removeArrayElements";
1074   startTime = curTime;
1075
1076 //  if (cpCallback.isInvalid()) 
1077 //        CkPrintf("invalid pe %d\n",CkMyPe());
1078 //        CkAbort("Didn't set restart callback\n");;
1079   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1080
1081   // get rid of all buffering and remote recs
1082   // including destorying all array elements
1083 #if CK_NO_PROC_POOL  
1084         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1085 #else
1086         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1087 #endif
1088   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1089 #endif
1090 }
1091
1092 // flush state in reduction manager
1093 void CkMemCheckPT::resetReductionMgr()
1094 {
1095   if (CkMyPe() == thisFailedPe) 
1096     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1097   int numGroups = CkpvAccess(_groupIDTable)->size();
1098   for(int i=0;i<numGroups;i++) {
1099     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1100     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1101     obj->flushStates();
1102     obj->ckJustMigrated();
1103   }
1104   // reset again
1105   //CpvAccess(_qd)->flushStates();
1106   if(CmiNumPartition()==1){
1107         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1108   }
1109   else
1110         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1111 }
1112
1113 // recover the lost buddies
1114 void CkMemCheckPT::recoverBuddies()
1115 {
1116   int idx;
1117   int len = ckTable.length();
1118   // ready to flush reduction manager
1119   // cannot be CkMemCheckPT::restart because destory will modify states
1120   double curTime = CmiWallTimer();
1121   if (CkMyPe() == thisFailedPe)
1122   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1123   stage = (char *)"recoverBuddies";
1124   if (CkMyPe() == thisFailedPe)
1125   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1126   startTime = curTime;
1127
1128   // recover buddies
1129   expectCount = 0;
1130 #if !CMK_CHKP_ALL
1131   for (idx=0; idx<len; idx++) {
1132     CkCheckPTInfo *entry = ckTable[idx];
1133     if (entry->pNo == thisFailedPe) {
1134 #if CK_NO_PROC_POOL
1135       // find a new buddy
1136       int budPe = BuddyPE(CkMyPe());
1137 #else
1138       int budPe = thisFailedPe;
1139 #endif
1140       CkArrayCheckPTMessage *msg = entry->getCopy();
1141       msg->bud1 = budPe;
1142       msg->bud2 = CkMyPe();
1143       msg->cp_flag = 0;            // not checkpointing
1144       thisProxy[budPe].recoverEntry(msg);
1145       expectCount ++;
1146     }
1147   }
1148 #else
1149   //send to failed pe
1150   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1151 #if CK_NO_PROC_POOL
1152       // find a new buddy
1153       int budPe = BuddyPE(CkMyPe());
1154 #else
1155       int budPe = thisFailedPe;
1156 #endif
1157       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1158       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1159           msg->cp_flag = 0;            // not checkpointing
1160       msg->bud1 = budPe;
1161       msg->bud2 = CkMyPe();
1162       thisProxy[budPe].recoverEntry(msg);
1163       expectCount ++;
1164   }
1165 #endif
1166
1167   if (expectCount == 0) {
1168           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1169   }
1170   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1171 }
1172
1173 void CkMemCheckPT::gotData()
1174 {
1175   ackCount ++;
1176   if (ackCount == expectCount) {
1177     ackCount = 0;
1178     expectCount = -1;
1179     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1180     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1181   }
1182 }
1183
1184 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1185 {
1186
1187   for (int i=0; i<n; i++) {
1188     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1189     mgr->updateLocation(idx[i], nowOnPe);
1190   }
1191         thisProxy[nowOnPe].gotReply();
1192 }
1193
1194 // restore array elements
1195 void CkMemCheckPT::recoverArrayElements()
1196 {
1197   double curTime = CmiWallTimer();
1198   int len = ckTable.length();
1199   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1200   stage = (char *)"recoverArrayElements";
1201   if (CkMyPe() == thisFailedPe)
1202   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1203   startTime = curTime;
1204  int flag = 0;
1205   // recover all array elements
1206   int count = 0;
1207
1208 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1209   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1210   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1211 #endif
1212
1213 #if !CMK_CHKP_ALL
1214   for (int idx=0; idx<len; idx++)
1215   {
1216     CkCheckPTInfo *entry = ckTable[idx];
1217 #if CK_NO_PROC_POOL
1218     // the bigger one will do 
1219 //    if (CkMyPe() < entry->pNo) continue;
1220     if (!isMaster(entry->pNo)) continue;
1221 #else
1222     // smaller one do it, which has the original object
1223     if (CkMyPe() == entry->pNo+1 || 
1224         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1225 #endif
1226 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1227
1228     entry->updateBuddy(CkMyPe(), entry->pNo);
1229     CkArrayCheckPTMessage *msg = entry->getCopy();
1230     // gzheng
1231     //thisProxy[CkMyPe()].inmem_restore(msg);
1232     inmem_restore(msg);
1233 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1234     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1235     int homePe = mgr->homePe(msg->index);
1236     if (homePe != CkMyPe()) {
1237       gmap[homePe].push_back(msg->locMgr);
1238       imap[homePe].push_back(msg->index);
1239     }
1240 #endif
1241     CkFreeMsg(msg);
1242     count ++;
1243   }
1244 #else
1245         double * packData;
1246         if(CmiNumPartition()==1){
1247                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1248                 packData = msg->packData;
1249         }
1250         else{
1251                 int pointer = CpvAccess(curPointer);
1252                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1253                 packData = msg->packData;
1254         }
1255 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1256         recoverAll(packData,gmap,imap);
1257 #else
1258         recoverAll(packData);
1259 #endif
1260 #endif
1261   curTime = CmiWallTimer();
1262   //if (CkMyPe() == thisFailedPe)
1263   if (CkMyPe() == 0)
1264         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1265 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1266   for (int i=0; i<CkNumPes(); i++) {
1267     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1268       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1269         flag++; 
1270         }
1271   }
1272   delete [] imap;
1273   delete [] gmap;
1274 #endif
1275   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1276
1277   CKLOCMGR_LOOP(mgr->doneInserting(););
1278
1279   // _crashedNode = -1;
1280   CpvAccess(_crashedNode) = -1;
1281   inRestarting = 0;
1282 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1283   if (CkMyPe() == 0)
1284     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1285 #else
1286 if(flag == 0)
1287 {
1288     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1289 }
1290 #endif
1291 }
1292
1293 void CkMemCheckPT::gotReply(){
1294     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1295 }
1296
1297 void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1298 #if CMK_CHKP_ALL
1299         PUP::fromMem p(packData);
1300         int numElements;
1301         p|numElements;
1302         if(p.isUnpacking()){
1303                 for(int i=0;i<numElements;i++){
1304                         CkGroupID gID;
1305                         CkArrayIndex idx;
1306                         p|gID;
1307                         p|idx;
1308                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1309                         int homePe = mgr->homePe(idx);
1310 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1311                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1312 #else
1313                         if(CmiNumPartition()==1)
1314                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1315                         else{
1316                                 if(CkMyPe()==thisFailedPe){
1317                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1318                                 }
1319                         else{
1320                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1321                                 }
1322                         }
1323 #endif
1324 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1325                         homePe = mgr->homePe(idx);
1326                         if (homePe != CkMyPe()) {
1327                           gmap[homePe].push_back(gID);
1328                           imap[homePe].push_back(idx);
1329                         }
1330 #endif
1331                 }
1332         }
1333 #endif
1334 }
1335
1336
1337 // on every processor
1338 // turn load balancer back on
1339 void CkMemCheckPT::finishUp()
1340 {
1341   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1342   //CKLOCMGR_LOOP(mgr->doneInserting(););
1343   
1344   if (CkMyPe() == thisFailedPe)
1345   {
1346        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1347        cpCallback.send();
1348        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1349   }
1350         
1351 #if CMK_CONVERSE_MPI    
1352   if(CmiNumPartition()!=1){
1353         CpvAccess(recvdProcChkp) = 0;
1354         CpvAccess(recvdArrayChkp) = 0;
1355         CpvAccess(curPointer)^=1;
1356         //notify my replica, restart is done
1357    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1358    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1359    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1360   }
1361    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1362          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1363    }
1364 #endif
1365
1366 #if CK_NO_PROC_POOL
1367 #if NODE_CHECKPOINT
1368   int numnodes = CmiNumPhysicalNodes();
1369 #else
1370   int numnodes = CkNumPes();
1371 #endif
1372   if (numnodes-totalFailed() <=2) {
1373     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1374     _memChkptOn = 0;
1375   }
1376 #endif
1377 }
1378
1379 void CkMemCheckPT::recoverFromSoftFailure()
1380 {
1381         inRestarting = 0;
1382         if(CkMyPe()==0)
1383         cpCallback.send();
1384 }
1385 // called only on 0
1386 void CkMemCheckPT::quiescence(CkCallback &cb)
1387 {
1388   static int pe_count = 0;
1389   pe_count ++;
1390   CmiAssert(CkMyPe() == 0);
1391   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1392   if (pe_count == CkNumPes()) {
1393     pe_count = 0;
1394     cb.send();
1395   }
1396 }
1397
1398 // User callable function - to start a checkpoint
1399 // callback cb is used to pass control back
1400 void CkStartMemCheckpoint(CkCallback &cb)
1401 {
1402 #if CMK_MEM_CHECKPOINT
1403         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1404   if (_memChkptOn == 0) {
1405     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1406     cb.send();
1407     return;
1408   }
1409   if (CkInRestarting()) {
1410       // trying to checkpointing during restart
1411     cb.send();
1412     return;
1413   }
1414     // store user callback and user data
1415   CkMemCheckPT::cpCallback = cb;
1416
1417     // broadcast to start check pointing
1418   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1419   checkptMgr.doItNow(CkMyPe(), cb);
1420 #else
1421   // when mem checkpoint is disabled, invike cb immediately
1422   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1423   cb.send();
1424 #endif
1425 }
1426
1427 void CkRestartCheckPoint(int diePe)
1428 {
1429 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1430   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1431   // broadcast
1432   checkptMgr.restart(diePe);
1433 }
1434
1435 static int _diePE = -1;
1436
1437 // callback function used locally by ccs handler
1438 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1439 {
1440 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1441   CkRestartCheckPoint(_diePE);
1442 }
1443
1444
1445 // called on crashed PE
1446 static void restartBeginHandler(char *msg)
1447 {
1448   CmiFree(msg);
1449 #if CMK_MEM_CHECKPOINT
1450 #if CMK_USE_BARRIER
1451         if(CkMyPe()!=_diePE){
1452                 printf("restar begin on %d\n",CkMyPe());
1453                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1454                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1455                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1456         }else{
1457         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1458         CkRestartCheckPointCallback(NULL, NULL);
1459         }
1460 #else
1461   static int count = 0;
1462   CmiAssert(CkMyPe() == _diePE);
1463   count ++;
1464   if (count == CkNumPes()) {
1465           printf("restart begin on %d\n",CkMyPe());
1466     CkRestartCheckPointCallback(NULL, NULL);
1467     count = 0;
1468   }
1469 #endif
1470 #endif
1471 }
1472
1473 extern void _discard_charm_message();
1474 extern void _resume_charm_message();
1475
1476 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1477         return data;
1478 }
1479
1480 static void restartBcastHandler(char *msg)
1481 {
1482 #if CMK_MEM_CHECKPOINT
1483   // advance phase counter
1484   CkMemCheckPT::inRestarting = 1;
1485   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1486   // gzheng
1487   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1488
1489   if (CkMyPe()==_diePE)
1490     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1491
1492   // reset QD counters
1493 /*  gzheng
1494   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1495 */
1496
1497 /*  gzheng
1498   if (CkMyPe()==_diePE)
1499       CkRestartCheckPointCallback(NULL, NULL);
1500 */
1501   CmiFree(msg);
1502
1503   _resume_charm_message();
1504
1505     // reduction
1506   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1507   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1508 #if CMK_USE_BARRIER
1509         //CmiPrintf("before reduce\n"); 
1510         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1511         //CmiPrintf("after reduce\n");  
1512 #else
1513   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1514 #endif 
1515  checkpointed = 0;
1516 #endif
1517 }
1518
1519 extern void _initDone();
1520
1521 bool compare(char * buf1, char *buf2){
1522         //buf1 my copy, buf2 from another one 
1523 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1524         PUP::checker pchecker(buf1,buf2);
1525         pchecker.skip();
1526         
1527         int numElements;
1528         pchecker|numElements;
1529         CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1530         for(int i=0;i<numElements;i++){
1531         //for(int i=0;i<1;i++){
1532                 CkGroupID gID;
1533                 CkArrayIndex idx;
1534                 
1535                 pchecker|gID;
1536                 pchecker|idx;
1537                 
1538                 CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1539                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1540                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1541                 CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1542         }
1543         if(pchecker.getResult()){
1544                 CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
1545         }else{
1546                 CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
1547         }
1548         return pchecker.getResult();
1549 }
1550
1551 static void recvRemoteChkpHandler(char *msg){
1552    envelope *env = (envelope *)msg;
1553    CkUnpackMessage(&env);
1554    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1555   if(CpvAccess(recvdLocal)==1){
1556           int pointer = CpvAccess(curPointer);
1557           int size = CpvAccess(chkpBuf)[pointer]->len;
1558           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1559           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1560                         checkptMgr[CkMyPe()].doneComparison(true);
1561           }else
1562           {
1563                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1564                         checkptMgr[CkMyPe()].doneComparison(false);
1565           }
1566   }else{
1567           CpvAccess(recvdRemote) = 1;
1568           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1569           CpvAccess(buddyBuf) = chkpMsg;
1570   }  
1571 }
1572
1573 static void replicaRecoverHandler(char *msg){
1574         CpvAccess(_remoteCrashedNode) = -1;
1575         CkMemCheckPT::replicaAlive = 1;
1576     bool ret = true;
1577         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1578         checkptMgr[CkMyPe()].doneComparison(ret);
1579 }
1580
1581 static void replicaDieHandler(char * msg){
1582 #if CMK_CONVERSE_MPI    
1583         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1584         CpvAccess(_remoteCrashedNode) = diePe;
1585         CkMemCheckPT::replicaAlive = 0;
1586         find_spare_mpirank(diePe,CmiMyPartition()^1);
1587     if(CkMyPe()==diePe){
1588       CkPrintf("pe %d in replicad word die\n",diePe);
1589             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1590     }
1591 #endif
1592         //broadcast to my partition
1593         //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1594         //checkptMgr.notifyReplicaDie(diePe);
1595     //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1596     //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1597 }
1598
1599
1600 static void replicaDieBcastHandler(char *msg){
1601         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1602         CpvAccess(_remoteCrashedNode) = diePe;
1603         CkMemCheckPT::replicaAlive = 0;
1604 }
1605
1606 static void recoverRemoteProcDataHandler(char *msg){
1607    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1608    envelope *env = (envelope *)msg;
1609    CkUnpackMessage(&env);
1610    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1611         
1612    //store the checkpoint
1613         int pointer = procMsg->pointer;
1614
1615
1616    if(CkMyPe()==CpvAccess(_crashedNode)){
1617            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1618            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1619            PUP::fromMem p(procMsg->packData);
1620            _handleProcData(p,CmiTrue);
1621            _initDone();
1622    }
1623    else{
1624            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1625            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1626            //_handleProcData(p,CmiFalse);
1627    }
1628    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1629    CKLOCMGR_LOOP(mgr->startInserting(););
1630    
1631    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1632    CpvAccess(recvdProcChkp) =1;
1633         if(CpvAccess(recvdArrayChkp)==1){
1634                 _resume_charm_message();
1635                 _diePE = CpvAccess(_crashedNode);
1636                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1637                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1638                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1639         }
1640 }
1641
1642 static void recoverRemoteArrayDataHandler(char *msg){
1643   if(CkMyPe()==CpvAccess(_crashedNode)) 
1644   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1645    envelope *env = (envelope *)msg;
1646    CkUnpackMessage(&env);
1647    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1648         
1649    //store the checkpoint
1650         int pointer = chkpMsg->pointer;
1651         CpvAccess(curPointer) = pointer;
1652         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1653         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1654    CpvAccess(recvdArrayChkp) =1;
1655         CkMemCheckPT::inRestarting = 1;
1656         if(CpvAccess(recvdProcChkp) == 1){
1657                 _resume_charm_message();
1658                 _diePE = CpvAccess(_crashedNode);
1659                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1660                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1661                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1662                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1663         }
1664 }
1665
1666 static void recvPhaseHandler(char * msg)
1667 {
1668         CpvAccess(_curRestartPhase)--;
1669         CkMemCheckPT::inRestarting = 1;
1670         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1671   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1672   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1673   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1674 //              CmiPrintf("start ping check handler\n");
1675 //       }
1676         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1677   // }
1678    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1679    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1680
1681 }
1682 // called on crashed processor
1683 static void recoverProcDataHandler(char *msg)
1684 {
1685 #if CMK_MEM_CHECKPOINT
1686    int i;
1687    envelope *env = (envelope *)msg;
1688    CkUnpackMessage(&env);
1689    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1690    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1691    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1692    PUP::fromMem p(procMsg->packData);
1693    _handleProcData(p,CmiTrue);
1694
1695    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1696    // gzheng
1697    CKLOCMGR_LOOP(mgr->startInserting(););
1698
1699    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1700    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1701    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1702    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1703
1704    _initDone();
1705 //   CpvAccess(_qd)->flushStates();
1706    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1707 #endif
1708 }
1709 //for replica, got the phase number from my neighbor processor in the current partition
1710 static void askPhaseHandler(char *msg)
1711 {
1712 #if CMK_MEM_CHECKPOINT
1713         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1714     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1715         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1716     CmiSetHandler(msg, recvPhaseHandlerIdx);
1717         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1718 #endif
1719 }
1720 // called on its backup processor
1721 // get backup message buffer and sent to crashed processor
1722 static void askProcDataHandler(char *msg)
1723 {
1724 #if CMK_MEM_CHECKPOINT
1725     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1726     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1727     if (CpvAccess(procChkptBuf) == NULL)  {
1728       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1729       CkAbort("no checkpoint found");
1730     }
1731     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1732     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1733
1734     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1735
1736     CkPackMessage(&env);
1737     CmiSetHandler(env, recoverProcDataHandlerIdx);
1738     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1739     CpvAccess(procChkptBuf) = NULL;
1740     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1741 #endif
1742 }
1743
1744 // called on PE 0
1745 void qd_callback(void *m)
1746 {
1747    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1748    CkFreeMsg(m);
1749    if(CmiNumPartition()==1){
1750 #ifdef CMK_SMP
1751            for(int i=0;i<CmiMyNodeSize();i++){
1752            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1753            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1754                 CmiSetHandler(msg, askProcDataHandlerIdx);
1755                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1756                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1757            }
1758            return;
1759 #endif
1760            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1761            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1762            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1763            CmiSetHandler(msg, askProcDataHandlerIdx);
1764            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1765            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1766         }
1767    else{
1768                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1769                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1770                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1771    }
1772 }
1773
1774 // on crashed node
1775 void CkMemRestart(const char *dummy, CkArgMsg *args)
1776 {
1777 #if CMK_MEM_CHECKPOINT
1778    _diePE = CmiMyNode();
1779    CpvAccess( _crashedNode )= CmiMyNode();
1780    CkMemCheckPT::inRestarting = 1;
1781    _discard_charm_message();
1782   
1783   /*if(CmiMyRank()==0){
1784     CkCallback cb(qd_callback);
1785     CkStartQD(cb);
1786     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1787   }*/
1788    if(CmiNumPartition()==1){
1789            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1790                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1791                 restartT = CmiWallTimer();
1792            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1793            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1794            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1795            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1796            CmiSetHandler(msg, askProcDataHandlerIdx);
1797            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1798            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1799    }
1800    else{
1801                 CkCallback cb(qd_callback);
1802                 CkStartQD(cb);
1803    }
1804 #else
1805    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1806 #endif
1807 }
1808
1809 // can be called in other files
1810 // return true if it is in restarting
1811 extern "C"
1812 int CkInRestarting()
1813 {
1814 #if CMK_MEM_CHECKPOINT
1815   if (CpvAccess( _crashedNode)!=-1) return 1;
1816   // gzheng
1817   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1818   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1819   return CkMemCheckPT::inRestarting;
1820 #else
1821   return 0;
1822 #endif
1823 }
1824
1825 extern "C"
1826 int CkReplicaAlive()
1827 {
1828         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1829           CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1830         return CkMemCheckPT::replicaAlive;
1831
1832         /*if(CkMemCheckPT::replicaDead==1)
1833                 return 0;
1834         else            
1835                 return 1;*/
1836 }
1837
1838 extern "C"
1839 int CkInCheckpointing()
1840 {
1841         return CkMemCheckPT::inCheckpointing;
1842 }
1843
1844 extern "C"
1845 void CkSetInLdb(){
1846 #if CMK_MEM_CHECKPOINT
1847         CkMemCheckPT::inLoadbalancing = 1;
1848 #endif
1849 }
1850
1851 extern "C"
1852 int CkInLdb(){
1853 #if CMK_MEM_CHECKPOINT
1854         return CkMemCheckPT::inLoadbalancing;
1855 #endif
1856         return 0;
1857 }
1858
1859 extern "C"
1860 void CkResetInLdb(){
1861 #if CMK_MEM_CHECKPOINT
1862         CkMemCheckPT::inLoadbalancing = 0;
1863 #endif
1864 }
1865
1866 /*****************************************************************************
1867                 module initialization
1868 *****************************************************************************/
1869
1870 static int arg_where = CkCheckPoint_inMEM;
1871
1872 #if CMK_MEM_CHECKPOINT
1873 void init_memcheckpt(char **argv)
1874 {
1875     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1876       arg_where = CkCheckPoint_inDISK;
1877     }
1878
1879         // initiliazing _crashedNode variable
1880         CpvInitialize(int, _crashedNode);
1881         CpvInitialize(int, _remoteCrashedNode);
1882         CpvAccess(_crashedNode) = -1;
1883         CpvAccess(_remoteCrashedNode) = -1;
1884 }
1885 #endif
1886
1887 class CkMemCheckPTInit: public Chare {
1888 public:
1889   CkMemCheckPTInit(CkArgMsg *m) {
1890 #if CMK_MEM_CHECKPOINT
1891     if (arg_where == CkCheckPoint_inDISK) {
1892       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1893     }
1894     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1895     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1896 #endif
1897   }
1898 };
1899
1900 static void notifyHandler(char *msg)
1901 {
1902 #if CMK_MEM_CHECKPOINT
1903   CmiFree(msg);
1904       /* immediately increase restart phase to filter old messages */
1905   CpvAccess(_curRestartPhase) ++;
1906   CpvAccess(_qd)->flushStates();
1907   _discard_charm_message();
1908
1909 #endif
1910 }
1911
1912 extern "C"
1913 void notify_crash(int node)
1914 {
1915 #ifdef CMK_MEM_CHECKPOINT
1916   CpvAccess( _crashedNode) = node;
1917 #ifdef CMK_SMP
1918   for(int i=0;i<CkMyNodeSize();i++){
1919         CpvAccessOther(_crashedNode,i)=node;
1920   }
1921 #endif
1922   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1923   CkMemCheckPT::inRestarting = 1;
1924
1925     // this may be in interrupt handler, send a message to reset QD
1926   int pe = CmiNodeFirst(CkMyNode());
1927   for(int i=0;i<CkMyNodeSize();i++){
1928         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1929         CmiSetHandler(msg, notifyHandlerIdx);
1930         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1931   }
1932 #endif
1933 }
1934
1935 extern "C" void (*notify_crash_fn)(int node);
1936
1937
1938 #if CMK_CONVERSE_MPI
1939 void buddyDieHandler(char *msg)
1940 {
1941 #if CMK_MEM_CHECKPOINT
1942    // notify
1943         CkMemCheckPT::inRestarting = 1;
1944    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1945    notify_crash(diepe);
1946    // send message to crash pe to let it restart
1947    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1948    CkPrintf("[%d] finding newrank\n",CkMyPe());
1949    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1950    CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1951    int buddy = obj->BuddyPE(CmiMyPe());
1952    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1953    if (buddy == diepe)  {
1954      mpi_restart_crashed(diepe, newrank);
1955    }
1956 #endif
1957 }
1958
1959 void pingHandler(void *msg)
1960 {
1961   lastPingTime = CmiWallTimer();
1962   CmiFree(msg);
1963 }
1964
1965 void pingCheckHandler()
1966 {
1967 #if CMK_MEM_CHECKPOINT
1968   double now = CmiWallTimer();
1969   if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
1970   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
1971     int i, pe, buddy;
1972     // tell everyone the buddy dies
1973     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1974     for (i = 1; i < CmiNumPes(); i++) {
1975        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1976        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1977     }
1978     buddy = pe;
1979     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
1980     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1981       if (obj->isFailed(pe) || pe == buddy) continue;
1982       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1983       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1984       CmiSetHandler(msg, buddyDieHandlerIdx);
1985       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1986     }*/
1987     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1988     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1989     CmiSetHandler(msg, buddyDieHandlerIdx);
1990     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1991     //send to everyone in the other world
1992         if(CmiNumPartition()!=1){
1993                 for(int i=0;i<CmiNumPes();i++){
1994                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1995                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
1996                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
1997                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
1998                 }
1999         }
2000   }
2001   else 
2002     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2003 #endif
2004 }
2005
2006 void pingBuddy()
2007 {
2008 #if CMK_MEM_CHECKPOINT
2009   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2010   if (obj) {
2011     int buddy = obj->BuddyPE(CkMyPe());
2012     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2013     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2014     CmiSetHandler(msg, pingHandlerIdx);
2015     CmiGetRestartPhase(msg) = 9999;
2016     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2017   }
2018   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2019 #endif
2020 }
2021 #endif
2022
2023 // initproc
2024 void CkRegisterRestartHandler( )
2025 {
2026 #if CMK_MEM_CHECKPOINT
2027   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2028   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2029   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2030   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2031   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2032   
2033   //for replica
2034   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2035   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2036   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2037   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2038   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2039   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2040   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2041   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2042
2043 #if CMK_CONVERSE_MPI
2044   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2045   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2046   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2047 #endif
2048
2049   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2050   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2051   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2052   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2053   CpvInitialize(int,curPointer);
2054   CpvInitialize(int,recvdLocal);
2055   CpvInitialize(int,recvdRemote);
2056   CpvInitialize(int,recvdProcChkp);
2057   CpvInitialize(int,recvdArrayChkp);
2058
2059   CpvAccess(procChkptBuf) = NULL;
2060   CpvAccess(buddyBuf) = NULL;
2061   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2062   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2063   CpvAccess(chkpBuf)[0] = NULL;
2064   CpvAccess(chkpBuf)[1] = NULL;
2065   CpvAccess(localProcChkpBuf)[0] = NULL;
2066   CpvAccess(localProcChkpBuf)[1] = NULL;
2067   
2068   CpvAccess(curPointer) = 0;
2069   CpvAccess(recvdLocal) = 0;
2070   CpvAccess(recvdRemote) = 0;
2071   CpvAccess(recvdProcChkp) = 0;
2072   CpvAccess(recvdArrayChkp) = 0;
2073
2074   notify_crash_fn = notify_crash;
2075
2076 #if ! CMK_CONVERSE_MPI
2077   // print pid to kill
2078 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2079 //  sleep(4);
2080 #endif
2081 #endif
2082 }
2083
2084
2085 extern "C"
2086 int CkHasCheckpoints()
2087 {
2088   return checkpointed;
2089 }
2090
2091 /// @todo: the following definitions should be moved to a separate file containing
2092 // structures and functions about fault tolerance strategies
2093
2094 /**
2095  *  * @brief: function for killing a process                                             
2096  *   */
2097 #ifdef CMK_MEM_CHECKPOINT
2098 #if CMK_HAS_GETPID
2099 void killLocal(void *_dummy,double curWallTime){
2100         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2101         if(CmiWallTimer()<killTime-1){
2102                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2103         }else{ 
2104 #if CMK_CONVERSE_MPI
2105                                 CkDieNow();
2106 #else 
2107                 kill(getpid(),SIGKILL);                                               
2108 #endif
2109         }              
2110
2111 #else
2112 void killLocal(void *_dummy,double curWallTime){
2113   CmiAbort("kill() not supported!");
2114 }
2115 #endif
2116 #endif
2117
2118 #ifdef CMK_MEM_CHECKPOINT
2119 /**
2120  * @brief: reads the file with the kill information
2121  */
2122 void readKillFile(){
2123         FILE *fp=fopen(killFile,"r");
2124         if(!fp){
2125                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2126                 return;
2127         }
2128         int proc;
2129         double sec;
2130         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2131                 if(proc == CkMyNode() && CkMyRank() == 0){
2132                         killTime = CmiWallTimer()+sec;
2133                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2134                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2135                 }
2136         }
2137         fclose(fp);
2138 }
2139
2140 #if ! CMK_CONVERSE_MPI
2141 void CkDieNow()
2142 {
2143 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2144          // ignored for non-mpi version
2145         CmiPrintf("[%d] die now.\n", CmiMyPe());
2146         killTime = CmiWallTimer()+0.001;
2147         CcdCallFnAfter(killLocal,NULL,1);
2148 #endif
2149 }
2150 #endif
2151
2152 #endif
2153
2154 #include "CkMemCheckpoint.def.h"
2155
2156