support for medium resilience
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74 CpvDeclare(int, _remoteCrashedNode);
75
76 // static, so that it is accessible from Converse part
77 int CkMemCheckPT::inRestarting = 0;
78 int CkMemCheckPT::inCheckpointing = 0;
79 int CkMemCheckPT::replicaAlive = 1;
80 int CkMemCheckPT::inLoadbalancing = 0;
81 double CkMemCheckPT::startTime;
82 char *CkMemCheckPT::stage;
83 CkCallback CkMemCheckPT::cpCallback;
84
85 int _memChkptOn = 1;                    // checkpoint is on or off
86
87 CkGroupID ckCheckPTGroupID;             // readonly
88
89 static int checkpointed = 0;
90
91 /// @todo the following declarations should be moved into a separate file for all 
92 // fault tolerant strategies
93
94 #ifdef CMK_MEM_CHECKPOINT
95 // name of the kill file that contains processes to be killed 
96 char *killFile;                                               
97 // flag for the kill file         
98 int killFlag=0;
99 // variable for storing the killing time
100 double killTime=0.0;
101 #endif
102
103 #ifdef CKLOCMGR_LOOP
104 #undef CKLOCMGR_LOOP
105 #endif
106 // loop over all CkLocMgr and do "code"
107 #define  CKLOCMGR_LOOP(code)    {       \
108   int numGroups = CkpvAccess(_groupIDTable)->size();    \
109   for(int i=0;i<numGroups;i++) {        \
110     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
111     if(obj->isLocMgr())  {      \
112       CkLocMgr *mgr = (CkLocMgr*)obj;   \
113       code      \
114     }   \
115   }     \
116  }
117
118 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
119 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
120 CpvDeclare(CkCheckPTMessage**, chkpBuf);
121 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
122 //store the checkpoint of the buddy to compare
123 //do not need the whole msg, can be the checksum
124 CpvDeclare(CkCheckPTMessage*, buddyBuf);
125 //pointer of the checkpoint going to be written
126 CpvDeclare(int, curPointer);
127 CpvDeclare(int, recvdRemote);
128 CpvDeclare(int, recvdLocal);
129 CpvDeclare(int, recvdArrayChkp);
130 CpvDeclare(int, recvdProcChkp);
131
132 bool compare(char * buf1, char * buf2);
133 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
134 // Converse function handles
135 static int askPhaseHandlerIdx;
136 static int recvPhaseHandlerIdx;
137 static int askProcDataHandlerIdx;
138 static int restartBcastHandlerIdx;
139 static int recoverProcDataHandlerIdx;
140 static int restartBeginHandlerIdx;
141 static int recvRemoteChkpHandlerIdx;
142 static int replicaDieHandlerIdx;
143 static int replicaDieBcastHandlerIdx;
144 static int replicaRecoverHandlerIdx;
145 static int recoverRemoteProcDataHandlerIdx;
146 static int recoverRemoteArrayDataHandlerIdx;
147 static int notifyHandlerIdx;
148 // compute the backup processor
149 // FIXME: avoid crashed processors
150 #if CMK_CONVERSE_MPI
151 static int pingHandlerIdx;
152 static int pingCheckHandlerIdx;
153 static int buddyDieHandlerIdx;
154 static double lastPingTime = -1;
155
156 extern "C" void mpi_restart_crashed(int pe, int rank);
157 extern "C" int  find_spare_mpirank(int pe,int partition);
158
159 void pingBuddy();
160 void pingCheckHandler();
161
162 #endif
163 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
164
165 inline int CkMemCheckPT::BuddyPE(int pe)
166 {
167   int budpe;
168 #if NODE_CHECKPOINT
169     // buddy is the processor with same rank on the next physical node
170   int r1 = CmiPhysicalRank(pe);
171   int budnode = CmiPhysicalNodeID(pe);
172   do {
173     budnode = (budnode+1)%CmiNumPhysicalNodes();
174     int *pelist;
175     int num;
176     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
177     budpe = pelist[r1 % num];
178   } while (isFailed(budpe));
179   if (budpe == pe) {
180     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
181     CmiAbort("Failed to find a buddy processor");
182   }
183 #else
184   budpe = pe;
185   while (budpe == pe || isFailed(budpe)) 
186           budpe = (budpe+1)%CkNumPes();
187 #endif
188   return budpe;
189 }
190
191 // called in array element constructor
192 // choose and register with 2 buddies for checkpoiting 
193 #if CMK_MEM_CHECKPOINT
194 void ArrayElement::init_checkpt() {
195         if (_memChkptOn == 0) return;
196         if (CkInRestarting()) {
197           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
198         }
199         // only master init checkpoint
200         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
201
202         budPEs[0] = CkMyPe();
203         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
204         CmiAssert(budPEs[0] != budPEs[1]);
205         // inform checkPTMgr
206         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
207         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
208         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
209         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
210 }
211 #endif
212
213 // entry function invoked by checkpoint mgr asking for checkpoint data
214 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
215 #if CMK_MEM_CHECKPOINT
216 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
217 //char index[128];   thisIndexMax.sprint(index);
218 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
219   CkLocMgr *locMgr = thisArray->getLocMgr();
220   CmiAssert(myRec!=NULL);
221   int size;
222   {
223         PUP::sizer p;
224         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
225         size = p.size();
226   }
227   int packSize = size/sizeof(double) +1;
228   CkArrayCheckPTMessage *msg =
229                  new (packSize, 0) CkArrayCheckPTMessage;
230   msg->len = size;
231   msg->index =thisIndexMax;
232   msg->aid = thisArrayID;
233   msg->locMgr = locMgr->getGroupID();
234   msg->cp_flag = 1;
235   {
236         PUP::toMem p(msg->packData);
237         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
238   }
239
240   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
241   checkptMgr.recvData(msg, 2, budPEs);
242   delete m;
243 #endif
244 }
245
246 // checkpoint holder class - for memory checkpointing
247 class CkMemCheckPTInfo: public CkCheckPTInfo
248 {
249   CkArrayCheckPTMessage *ckBuffer;
250 public:
251   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
252                     CkCheckPTInfo(a, loc, idx, pno)
253   {
254     ckBuffer = NULL;
255   }
256   ~CkMemCheckPTInfo() 
257   {
258     if (ckBuffer) delete ckBuffer; 
259   }
260   inline void updateBuffer(CkArrayCheckPTMessage *data) 
261   {
262     CmiAssert(data!=NULL);
263     if (ckBuffer) delete ckBuffer;
264     ckBuffer = data;
265   }    
266   inline CkArrayCheckPTMessage * getCopy()
267   {
268     if (ckBuffer == NULL) {
269       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
270       CmiAbort("Abort!");
271     }
272     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
273   }     
274   inline void updateBuddy(int b1, int b2) {
275      CmiAssert(ckBuffer);
276      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
277      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
278      CmiAssert(pNo != CkMyPe());
279   }
280   inline int getSize() { 
281      CmiAssert(ckBuffer);
282      return ckBuffer->len; 
283   }
284 };
285
286 // checkpoint holder class - for in-disk checkpointing
287 class CkDiskCheckPTInfo: public CkCheckPTInfo 
288 {
289   char *fname;
290   int bud1, bud2;
291   int len;                      // checkpoint size
292 public:
293   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
294   {
295 #if CMK_USE_MKSTEMP
296     fname = new char[64];
297     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
298     mkstemp(fname);
299 #else
300     fname=tmpnam(NULL);
301 #endif
302     bud1 = bud2 = -1;
303     len = 0;
304   }
305   ~CkDiskCheckPTInfo() 
306   {
307     remove(fname);
308   }
309   inline void updateBuffer(CkArrayCheckPTMessage *data) 
310   {
311     double t = CmiWallTimer();
312     // unpack it
313     envelope *env = UsrToEnv(data);
314     CkUnpackMessage(&env);
315     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
316     FILE *f = fopen(fname,"wb");
317     PUP::toDisk p(f);
318     CkPupMessage(p, (void **)&data);
319     // delay sync to the end because otherwise the messages are blocked
320 //    fsync(fileno(f));
321     fclose(f);
322     bud1 = data->bud1;
323     bud2 = data->bud2;
324     len = data->len;
325     delete data;
326     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
327   }
328   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
329   {
330     CkArrayCheckPTMessage *data;
331     FILE *f = fopen(fname,"rb");
332     PUP::fromDisk p(f);
333     CkPupMessage(p, (void **)&data);
334     fclose(f);
335     data->bud1 = bud1;                          // update the buddies
336     data->bud2 = bud2;
337     return data;
338   }
339   inline void updateBuddy(int b1, int b2) {
340      bud1 = b1; bud2 = b2;
341      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
342      CmiAssert(pNo != CkMyPe());
343   }
344   inline int getSize() { 
345      return len; 
346   }
347 };
348
349 CkMemCheckPT::CkMemCheckPT(int w)
350 {
351   int numnodes = 0;
352 #if NODE_CHECKPOINT
353   numnodes = CmiNumPhysicalNodes();
354 #else
355   numnodes = CkNumPes();
356 #endif
357 #if CK_NO_PROC_POOL
358   if (numnodes <= 2)
359 #else
360   if (numnodes  == 1)
361 #endif
362   {
363     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
364     _memChkptOn = 0;
365   }
366   inRestarting = 0;
367   inCheckpointing = 0;
368   recvCount = peCount = 0;
369   ackCount = 0;
370   expectCount = -1;
371   where = w;
372   replicaAlive = 1;
373 #if CMK_CONVERSE_MPI
374   void pingBuddy();
375   void pingCheckHandler();
376   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
377   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
378 #endif
379   chkpTable[0] = NULL;
380   chkpTable[1] = NULL;
381   maxIter = -1;
382   recvIterCount = 0;
383   localDecided = false;
384 }
385
386 CkMemCheckPT::~CkMemCheckPT()
387 {
388   int len = ckTable.length();
389   for (int i=0; i<len; i++) {
390     delete ckTable[i];
391   }
392 }
393
394 void CkMemCheckPT::pup(PUP::er& p) 
395
396   CBase_CkMemCheckPT::pup(p); 
397   p|cpStarter;
398   p|thisFailedPe;
399   p|failedPes;
400   p|ckCheckPTGroupID;           // recover global variable
401   p|cpCallback;                 // store callback
402   p|where;                      // where to checkpoint
403   p|peCount;
404   if (p.isUnpacking()) {
405     recvCount = 0;
406 #if CMK_CONVERSE_MPI
407     void pingBuddy();
408     void pingCheckHandler();
409     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
410     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
411 #endif
412           maxIter = -1;
413           recvIterCount = 0;
414           localDecided = false;
415   }
416 }
417
418 void CkMemCheckPT::getIter(){
419         localDecided = true;
420         localMaxIter = maxIter+1;
421         contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
422 }
423
424 //when one replica failes, decide the next chkp iter
425
426 void CkMemCheckPT::recvIter(int iter){
427         if(maxIter<iter){
428                 maxIter=iter;
429         }
430 }
431
432 void CkMemCheckPT::recvMaxIter(int iter){
433         localDecided = false;
434         CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
435 }
436
437 void CkMemCheckPT::reachChkpIter(){
438         recvIterCount++;
439         elemCount = CkCountArrayElements();
440         if(recvIterCount == elemCount){
441                 recvIterCount = 0;
442                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
443         }
444 }
445
446 void CkMemCheckPT::startChkp(){
447         CkStartMemCheckpoint(cpCallback);
448 }
449
450 // called by checkpoint mgr to restore an array element
451 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
452 {
453 #if CMK_MEM_CHECKPOINT
454   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
455   // m->index.print();
456   PUP::fromMem p(m->packData);
457   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
458   CmiAssert(mgr);
459 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
460   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
461 #else
462   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
463 #endif
464
465   // find a list of array elements bound together
466   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
467   CmiAssert(elt);
468   CkLocRec_local *rec = elt->myRec;
469   CkVec<CkMigratable *> list;
470   mgr->migratableList(rec, list);
471   CmiAssert(list.length() > 0);
472   for (int l=0; l<list.length(); l++) {
473     elt = (ArrayElement *)list[l];
474     elt->budPEs[0] = m->bud1;
475     elt->budPEs[1] = m->bud2;
476     //    reset, may not needed now
477     // for now.
478     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
479       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
480       if (c) c->redNo = 0;
481     }
482   }
483 #endif
484 }
485
486 // return 1 if pe is a crashed processor
487 int CkMemCheckPT::isFailed(int pe)
488 {
489   for (int i=0; i<failedPes.length(); i++)
490     if (failedPes[i] == pe) return 1;
491   return 0;
492 }
493
494 // add pe into history list of all failed processors
495 void CkMemCheckPT::failed(int pe)
496 {
497   if (isFailed(pe)) return;
498   failedPes.push_back(pe);
499 }
500
501 int CkMemCheckPT::totalFailed()
502 {
503   return failedPes.length();
504 }
505
506 // create an checkpoint entry for array element of aid with index.
507 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
508 {
509   // error check, no duplicate
510   int idx, len = ckTable.size();
511   for (idx=0; idx<len; idx++) {
512     CkCheckPTInfo *entry = ckTable[idx];
513     if (index == entry->index) {
514       if (loc == entry->locMgr) {
515           // bindTo array elements
516           return;
517       }
518         // for array inheritance, the following check may fail
519         // because ArrayElement constructor of all superclasses are called
520       if (aid == entry->aid) {
521         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
522         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
523       }
524     }
525   }
526   CkCheckPTInfo *newEntry;
527   if (where == CkCheckPoint_inMEM)
528     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
529   else
530     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
531   ckTable.push_back(newEntry);
532   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
533 }
534
535 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
536 {
537 #if !CMK_CHKP_ALL       
538   int buddy = msg->bud1;
539   if (buddy == CkMyPe()) buddy = msg->bud2;
540   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
541   recvData(msg);
542     // ack
543   thisProxy[buddy].gotData();
544 #else
545   chkpTable[0] = NULL;
546   chkpTable[1] = NULL;
547   recvArrayCheckpoint(msg);
548   thisProxy[msg->bud2].gotData();
549 #endif
550 }
551
552 // loop through my checkpoint table and ask checkpointed array elements
553 // to send me checkpoint data.
554 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
555 {
556   checkpointed = 1;
557   cpCallback = cb;
558   cpStarter = starter;
559   inCheckpointing = 1;
560   if (CkMyPe() == cpStarter) {
561     startTime = CmiWallTimer();
562     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
563   }
564 #if CMK_CONVERSE_MPI
565   if(CmiNumPartition()==1)
566 #endif    
567   {
568
569 #if !CMK_CHKP_ALL
570   int len = ckTable.length();
571   for (int i=0; i<len; i++) {
572     CkCheckPTInfo *entry = ckTable[i];
573       // always let the bigger number processor send request
574     //if (CkMyPe() < entry->pNo) continue;
575       // always let the smaller number processor send request, may on same proc
576     if (!isMaster(entry->pNo)) continue;
577       // call inmem_checkpoint to the array element, ask it to send
578       // back checkpoint data via recvData().
579     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
580     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
581   }
582     // if my table is empty, then I am done
583   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
584 #else
585   startArrayCheckpoint();
586 #endif
587         sendProcData();
588   }
589 #if CMK_CONVERSE_MPI
590   else
591   {
592         startCheckpoint();
593   }
594 #endif
595   // pack and send proc level data
596 }
597
598 class MemElementPacker : public CkLocIterator{
599         private:
600                 CkLocMgr *locMgr;
601                 PUP::er &p;
602         public:
603                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
604                 void addLocation(CkLocation &loc){
605                         CkArrayIndexMax idx = loc.getIndex();
606                         CkGroupID gID = locMgr->ckGetGroupID();
607                         p|gID;
608                         p|idx;
609                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
610                 }
611 };
612
613 void pupAllElements(PUP::er &p){
614 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
615         int numElements;
616         if(!p.isUnpacking()){
617                 numElements = CkCountArrayElements();
618         }
619         p | numElements;
620         if(!p.isUnpacking()){
621                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
622         }
623 #endif
624 }
625
626 void CkMemCheckPT::startArrayCheckpoint(){
627 #if CMK_CHKP_ALL
628         int size;
629         {
630                 PUP::sizer psizer;
631                 pupAllElements(psizer);
632                 size = psizer.size();
633         }
634         int packSize = size/sizeof(double)+1;
635  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
636         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
637         msg->len = size;
638         msg->cp_flag = 1;
639         int budPEs[2];
640         msg->bud1=CkMyPe();
641         msg->bud2=ChkptOnPe(CkMyPe());
642         {
643                 PUP::toMem p(msg->packData);
644                 pupAllElements(p);
645         }
646         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
647         if(chkpTable[0]) delete chkpTable[0];
648         chkpTable[0] = msg;
649         //send the checkpoint to my 
650         recvCount++;
651 #endif
652 }
653
654 void CkMemCheckPT::startCheckpoint(){
655 #if CMK_CONVERSE_MPI
656         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
657     CkPrintf("in start checkpointing!!!!\n");
658   int size;
659   {
660     PUP::sizer p;
661     _handleProcData(p,CmiFalse);
662     size = p.size();
663   }
664         int packSize = size/sizeof(double)+1;
665         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
666         procMsg->len = size;
667         procMsg->cp_flag = 1;
668         {
669                 PUP::toMem p(procMsg->packData);
670                 _handleProcData(p,CmiFalse);
671         }
672         int pointer = CpvAccess(curPointer);
673         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
674                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
675         
676         {
677                 PUP::sizer psizer;
678                 pupAllElements(psizer);
679                 size = psizer.size();
680         }
681         packSize = size/sizeof(double)+1;
682         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
683         msg->len = size;
684         msg->cp_flag = 1;
685         {
686                 PUP::toMem p(msg->packData);
687                 pupAllElements(p);
688         }
689         pointer = CpvAccess(curPointer);
690         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
691     CkPrintf("start checkpointing!!!!\n");
692         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
693                 CpvAccess(chkpBuf)[pointer] = msg;
694         if(CkReplicaAlive()==1){
695                 CpvAccess(recvdLocal) = 1;
696                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
697                 CkPackMessage(&env);
698                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
699                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
700         }
701         if(CpvAccess(recvdRemote)==1){
702                 //compare the checkpoint 
703           int size = CpvAccess(chkpBuf)[pointer]->len;
704           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
705                 thisProxy[CkMyPe()].doneComparison(true);
706           }else{
707                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
708                 thisProxy[CkMyPe()].doneComparison(false);
709           }
710   }
711         else{
712                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
713                         {       
714                                 int pointer = CpvAccess(curPointer);
715                                 //send the proc data
716                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
717                                 procMsg->pointer = pointer;
718                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
719                                 CkPackMessage(&env);
720                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
721                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
722                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
723                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
724                                 }
725                         }
726                         //send the array checkpoint data
727                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
728                         msg->pointer = CpvAccess(curPointer);
729                         envelope * env = (envelope *)(UsrToEnv(msg));
730                         CkPackMessage(&env);
731                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
732                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
733                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
734                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
735                         //can continue work, no need to wait for my replica
736                 }
737         }
738 #endif
739 }
740
741 void CkMemCheckPT::doneComparison(bool ret){
742         int _ret;
743         if(!ret){
744         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
745                 _ret = 0;
746         }else{
747                 _ret = 1;
748         }
749         inCheckpointing = 0;
750         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
751         contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
752 }
753
754 void CkMemCheckPT::doneRComparison(int ret){
755         CpvAccess(recvdRemote) = 0;
756         CpvAccess(recvdLocal) = 0;
757 //      if(CpvAccess(curPointer) == 0){
758         if(ret==1){
759         CpvAccess(curPointer)^=1;
760                 if(CkMyPe() == 0){
761                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
762                 }
763                 CKLOCMGR_LOOP(mgr->resumeFromChkp(););
764         }
765         else{
766         CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
767                 RollBack();
768         }
769 }
770
771 void CkMemCheckPT::RollBack(){
772         //restore group data
773         checkpointed = 0;
774         CkMemCheckPT::inRestarting = 1;
775         int pointer = CpvAccess(curPointer)^1;//use the previous one
776     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
777         PUP::fromMem p(chkpMsg->packData);      
778         
779         //destroy array elements
780         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
781           int numGroups = CkpvAccess(_groupIDTable)->size();
782           for(int i=0;i<numGroups;i++) {
783                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
784                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
785                 obj->flushStates();
786                 obj->ckJustMigrated();
787           }
788         //restore array elements
789         
790         int numElements;
791         p|numElements;
792         
793         if(p.isUnpacking()){
794                 for(int i=0;i<numElements;i++){
795                 //for(int i=0;i<1;i++){
796                         CkGroupID gID;
797                         CkArrayIndex idx;
798                         p|gID;
799                         p|idx;
800                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
801                         CmiAssert(mgr);
802                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
803                 }
804         }
805         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
806         contribute(cb);
807 }
808
809 void CkMemCheckPT::notifyReplicaDie(int pe){
810         //CkPrintf("[%d] receive replica die\n",CkMyPe());
811         replicaAlive = 0;
812         CpvAccess(_remoteCrashedNode) = pe;
813 }
814
815 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
816 {
817 #if CMK_CHKP_ALL
818         int idx = 1;
819         if(msg->bud1 == CkMyPe()){
820                 idx = 0;
821         }
822         int isChkpting = msg->cp_flag;
823         if(isChkpting == 1){
824                 if(chkpTable[idx]) delete chkpTable[idx];
825         }
826         chkpTable[idx] = msg;
827         if(isChkpting){
828                 recvCount++;
829                 if(recvCount == 2){
830                   if (where == CkCheckPoint_inMEM) {
831                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
832                   }
833                   else if (where == CkCheckPoint_inDISK) {
834                         // another barrier for finalize the writing using fsync
835                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
836                         contribute(0,NULL,CkReduction::sum_int,localcb);
837                   }
838                   else
839                         CmiAbort("Unknown checkpoint scheme");
840                   recvCount = 0;
841                 }
842         }
843 #endif
844 }
845
846 // don't handle array elements
847 static inline void _handleProcData(PUP::er &p, CmiBool create)
848 {
849     // save readonlys, and callback BTW
850     CkPupROData(p);
851
852     // save mainchares 
853     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
854         
855 #ifndef CMK_CHARE_USE_PTR
856     // save non-migratable chare
857     CkPupChareData(p);
858 #endif
859
860     // save groups into Groups.dat
861     CkPupGroupData(p,create);
862
863     // save nodegroups into NodeGroups.dat
864     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
865 }
866
867 void CkMemCheckPT::sendProcData()
868 {
869   // find out size of buffer
870   int size;
871   {
872     PUP::sizer p;
873     _handleProcData(p,CmiTrue);
874     size = p.size();
875   }
876   int packSize = size;
877   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
878   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
879   {
880     PUP::toMem p(msg->packData);
881     _handleProcData(p,CmiTrue);
882   }
883   msg->pe = CkMyPe();
884   msg->len = size;
885   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
886   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
887 }
888
889 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
890 {
891   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
892   CpvAccess(procChkptBuf) = msg;
893   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
894   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
895 }
896
897 // ArrayElement call this function to give us the checkpointed data
898 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
899 {
900   int len = ckTable.length();
901   int idx;
902   for (idx=0; idx<len; idx++) {
903     CkCheckPTInfo *entry = ckTable[idx];
904     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
905   }
906   CkAssert(idx < len);
907   int isChkpting = msg->cp_flag;
908   ckTable[idx]->updateBuffer(msg);
909   if (isChkpting) {
910       // all my array elements have returned their inmem data
911       // inform starter processor that I am done.
912     recvCount ++;
913     if (recvCount == ckTable.length()) {
914       if (where == CkCheckPoint_inMEM) {
915         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
916       }
917       else if (where == CkCheckPoint_inDISK) {
918         // another barrier for finalize the writing using fsync
919         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
920         contribute(0,NULL,CkReduction::sum_int,localcb);
921       }
922       else
923         CmiAbort("Unknown checkpoint scheme");
924       recvCount = 0;
925     } 
926   }
927 }
928
929 // only used in disk checkpointing
930 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
931 {
932   delete m;
933 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
934   system("sync");
935 #endif
936   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
937 }
938
939 // only is called on cpStarter when checkpoint is done
940 void CkMemCheckPT::cpFinish()
941 {
942   CmiAssert(CkMyPe() == cpStarter);
943   peCount++;
944     // now that all processors have finished, activate callback
945   if (peCount == 2) 
946 {
947     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
948     peCount = 0;
949     thisProxy.report();
950   }
951 }
952
953 // for debugging, report checkpoint info
954 void CkMemCheckPT::report()
955 {
956         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
957         inCheckpointing = 0;
958 #if !CMK_CHKP_ALL
959   int objsize = 0;
960   int len = ckTable.length();
961   for (int i=0; i<len; i++) {
962     CkCheckPTInfo *entry = ckTable[i];
963     CmiAssert(entry);
964     objsize += entry->getSize();
965   }
966   CmiAssert(CpvAccess(procChkptBuf));
967   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
968 #else
969   if(CkMyPe()==0)
970   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
971 #endif
972 }
973
974 /*****************************************************************************
975                         RESTART Procedure
976 *****************************************************************************/
977
978 // master processor of two buddies
979 inline int CkMemCheckPT::isMaster(int buddype)
980 {
981 #if 0
982   int mype = CkMyPe();
983 //CkPrintf("ismaster: %d %d\n", pe, mype);
984   if (CkNumPes() - totalFailed() == 2) {
985     return mype > buddype;
986   }
987   for (int i=1; i<CkNumPes(); i++) {
988     int me = (buddype+i)%CkNumPes();
989     if (isFailed(me)) continue;
990     if (me == mype) return 1;
991     else return 0;
992   }
993   return 0;
994 #else
995     // smaller one
996   int mype = CkMyPe();
997 //CkPrintf("ismaster: %d %d\n", pe, mype);
998   if (CkNumPes() - totalFailed() == 2) {
999     return mype < buddype;
1000   }
1001 #if NODE_CHECKPOINT
1002   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1003   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1004 #else
1005   for (int i=1; i<CkNumPes(); i++) {
1006 #endif
1007     int me = (mype+i)%CkNumPes();
1008     if (isFailed(me)) continue;
1009     if (me == buddype) return 1;
1010     else return 0;
1011   }
1012   return 0;
1013 #endif
1014 }
1015
1016
1017
1018 #if 0
1019 // helper class to pup all elements that belong to same ckLocMgr
1020 class ElementDestoryer : public CkLocIterator {
1021 private:
1022         CkLocMgr *locMgr;
1023 public:
1024         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1025         void addLocation(CkLocation &loc) {
1026                 CkArrayIndex idx=loc.getIndex();
1027                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1028                 loc.destroy();
1029         }
1030 };
1031 #endif
1032
1033 // restore the bitmap vector for LB
1034 void CkMemCheckPT::resetLB(int diepe)
1035 {
1036 #if CMK_LBDB_ON
1037   int i;
1038   char *bitmap = new char[CkNumPes()];
1039   // set processor available bitmap
1040   get_avail_vector(bitmap);
1041
1042   for (i=0; i<failedPes.length(); i++)
1043     bitmap[failedPes[i]] = 0; 
1044   bitmap[diepe] = 0;
1045
1046 #if CK_NO_PROC_POOL
1047   set_avail_vector(bitmap);
1048 #endif
1049
1050   // if I am the crashed pe, rebuild my failedPEs array
1051   if (CkMyNode() == diepe)
1052     for (i=0; i<CkNumPes(); i++) 
1053       if (bitmap[i]==0) failed(i);
1054
1055   delete [] bitmap;
1056 #endif
1057 }
1058
1059 static double restartT;
1060
1061 // in case when failedPe dies, everybody go through its checkpoint table:
1062 // destory all array elements
1063 // recover lost buddies
1064 // reconstruct all array elements from check point data
1065 // called on all processors
1066 void CkMemCheckPT::restart(int diePe)
1067 {
1068 #if CMK_MEM_CHECKPOINT
1069   double curTime = CmiWallTimer();
1070   if (CkMyPe() == diePe){
1071            restartT = CmiWallTimer();
1072     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1073   }
1074   stage = (char*)"resetLB";
1075   startTime = curTime;
1076   if (CkMyPe() == diePe)
1077     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1078
1079 #if CK_NO_PROC_POOL
1080   failed(diePe);        // add into the list of failed pes
1081 #endif
1082   thisFailedPe = diePe;
1083
1084   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1085
1086   inRestarting = 1;
1087                                                                                 
1088   // disable load balancer's barrier
1089   //if (CkMyPe() != diePe) resetLB(diePe);
1090
1091   CKLOCMGR_LOOP(mgr->startInserting(););
1092
1093
1094   if(CmiNumPartition()==1){
1095         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1096   }else{
1097         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1098         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1099   }
1100 #endif
1101 }
1102
1103 // loally remove all array elements
1104 void CkMemCheckPT::removeArrayElements()
1105 {
1106 #if CMK_MEM_CHECKPOINT
1107   int len = ckTable.length();
1108   double curTime = CmiWallTimer();
1109   if (CkMyPe() == thisFailedPe) 
1110     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1111   stage = (char*)"removeArrayElements";
1112   startTime = curTime;
1113
1114 //  if (cpCallback.isInvalid()) 
1115 //        CkPrintf("invalid pe %d\n",CkMyPe());
1116 //        CkAbort("Didn't set restart callback\n");;
1117   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1118
1119   // get rid of all buffering and remote recs
1120   // including destorying all array elements
1121 #if CK_NO_PROC_POOL  
1122         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1123 #else
1124         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1125 #endif
1126   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1127 #endif
1128 }
1129
1130 // flush state in reduction manager
1131 void CkMemCheckPT::resetReductionMgr()
1132 {
1133   if (CkMyPe() == thisFailedPe) 
1134     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1135   int numGroups = CkpvAccess(_groupIDTable)->size();
1136   for(int i=0;i<numGroups;i++) {
1137     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1138     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1139     obj->flushStates();
1140     obj->ckJustMigrated();
1141   }
1142   // reset again
1143   //CpvAccess(_qd)->flushStates();
1144   if(CmiNumPartition()==1){
1145         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1146   }
1147   else
1148         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1149 }
1150
1151 // recover the lost buddies
1152 void CkMemCheckPT::recoverBuddies()
1153 {
1154   int idx;
1155   int len = ckTable.length();
1156   // ready to flush reduction manager
1157   // cannot be CkMemCheckPT::restart because destory will modify states
1158   double curTime = CmiWallTimer();
1159   if (CkMyPe() == thisFailedPe)
1160   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1161   stage = (char *)"recoverBuddies";
1162   if (CkMyPe() == thisFailedPe)
1163   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1164   startTime = curTime;
1165
1166   // recover buddies
1167   expectCount = 0;
1168 #if !CMK_CHKP_ALL
1169   for (idx=0; idx<len; idx++) {
1170     CkCheckPTInfo *entry = ckTable[idx];
1171     if (entry->pNo == thisFailedPe) {
1172 #if CK_NO_PROC_POOL
1173       // find a new buddy
1174       int budPe = BuddyPE(CkMyPe());
1175 #else
1176       int budPe = thisFailedPe;
1177 #endif
1178       CkArrayCheckPTMessage *msg = entry->getCopy();
1179       msg->bud1 = budPe;
1180       msg->bud2 = CkMyPe();
1181       msg->cp_flag = 0;            // not checkpointing
1182       thisProxy[budPe].recoverEntry(msg);
1183       expectCount ++;
1184     }
1185   }
1186 #else
1187   //send to failed pe
1188   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1189 #if CK_NO_PROC_POOL
1190       // find a new buddy
1191       int budPe = BuddyPE(CkMyPe());
1192 #else
1193       int budPe = thisFailedPe;
1194 #endif
1195       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1196       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1197           msg->cp_flag = 0;            // not checkpointing
1198       msg->bud1 = budPe;
1199       msg->bud2 = CkMyPe();
1200       thisProxy[budPe].recoverEntry(msg);
1201       expectCount ++;
1202   }
1203 #endif
1204
1205   if (expectCount == 0) {
1206           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1207   }
1208   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1209 }
1210
1211 void CkMemCheckPT::gotData()
1212 {
1213   ackCount ++;
1214   if (ackCount == expectCount) {
1215     ackCount = 0;
1216     expectCount = -1;
1217     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1218     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1219   }
1220 }
1221
1222 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1223 {
1224
1225   for (int i=0; i<n; i++) {
1226     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1227     mgr->updateLocation(idx[i], nowOnPe);
1228   }
1229         thisProxy[nowOnPe].gotReply();
1230 }
1231
1232 // restore array elements
1233 void CkMemCheckPT::recoverArrayElements()
1234 {
1235   double curTime = CmiWallTimer();
1236   int len = ckTable.length();
1237   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1238   stage = (char *)"recoverArrayElements";
1239   if (CkMyPe() == thisFailedPe)
1240   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1241   startTime = curTime;
1242  int flag = 0;
1243   // recover all array elements
1244   int count = 0;
1245
1246 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1247   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1248   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1249 #endif
1250
1251 #if !CMK_CHKP_ALL
1252   for (int idx=0; idx<len; idx++)
1253   {
1254     CkCheckPTInfo *entry = ckTable[idx];
1255 #if CK_NO_PROC_POOL
1256     // the bigger one will do 
1257 //    if (CkMyPe() < entry->pNo) continue;
1258     if (!isMaster(entry->pNo)) continue;
1259 #else
1260     // smaller one do it, which has the original object
1261     if (CkMyPe() == entry->pNo+1 || 
1262         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1263 #endif
1264 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1265
1266     entry->updateBuddy(CkMyPe(), entry->pNo);
1267     CkArrayCheckPTMessage *msg = entry->getCopy();
1268     // gzheng
1269     //thisProxy[CkMyPe()].inmem_restore(msg);
1270     inmem_restore(msg);
1271 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1272     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1273     int homePe = mgr->homePe(msg->index);
1274     if (homePe != CkMyPe()) {
1275       gmap[homePe].push_back(msg->locMgr);
1276       imap[homePe].push_back(msg->index);
1277     }
1278 #endif
1279     CkFreeMsg(msg);
1280     count ++;
1281   }
1282 #else
1283         double * packData;
1284         if(CmiNumPartition()==1){
1285                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1286                 packData = msg->packData;
1287         }
1288         else{
1289                 int pointer = CpvAccess(curPointer);
1290                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1291                 packData = msg->packData;
1292         }
1293 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1294         recoverAll(packData,gmap,imap);
1295 #else
1296         recoverAll(packData);
1297 #endif
1298 #endif
1299   curTime = CmiWallTimer();
1300   //if (CkMyPe() == thisFailedPe)
1301   if (CkMyPe() == 0)
1302         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1303 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1304   for (int i=0; i<CkNumPes(); i++) {
1305     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1306       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1307         flag++; 
1308         }
1309   }
1310   delete [] imap;
1311   delete [] gmap;
1312 #endif
1313   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1314
1315   CKLOCMGR_LOOP(mgr->doneInserting(););
1316
1317   // _crashedNode = -1;
1318   CpvAccess(_crashedNode) = -1;
1319   inRestarting = 0;
1320 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1321   if (CkMyPe() == 0)
1322     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1323 #else
1324 if(flag == 0)
1325 {
1326     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1327 }
1328 #endif
1329 }
1330
1331 void CkMemCheckPT::gotReply(){
1332     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1333 }
1334
1335 void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1336 #if CMK_CHKP_ALL
1337         PUP::fromMem p(packData);
1338         int numElements;
1339         p|numElements;
1340         if(p.isUnpacking()){
1341                 for(int i=0;i<numElements;i++){
1342                         CkGroupID gID;
1343                         CkArrayIndex idx;
1344                         p|gID;
1345                         p|idx;
1346                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1347                         int homePe = mgr->homePe(idx);
1348 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1349                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1350 #else
1351                         if(CmiNumPartition()==1)
1352                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1353                         else{
1354                                 if(CkMyPe()==thisFailedPe){
1355                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1356                                 }
1357                         else{
1358                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1359                                 }
1360                         }
1361 #endif
1362 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1363                         homePe = mgr->homePe(idx);
1364                         if (homePe != CkMyPe()) {
1365                           gmap[homePe].push_back(gID);
1366                           imap[homePe].push_back(idx);
1367                         }
1368 #endif
1369                 }
1370         }
1371 #endif
1372 }
1373
1374
1375 // on every processor
1376 // turn load balancer back on
1377 void CkMemCheckPT::finishUp()
1378 {
1379   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1380   //CKLOCMGR_LOOP(mgr->doneInserting(););
1381   
1382   if (CkMyPe() == thisFailedPe)
1383   {
1384        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1385        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1386   }
1387   CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1388         
1389 #if CMK_CONVERSE_MPI    
1390   if(CmiNumPartition()!=1){
1391         CpvAccess(recvdProcChkp) = 0;
1392         CpvAccess(recvdArrayChkp) = 0;
1393         CpvAccess(curPointer)^=1;
1394         //notify my replica, restart is done
1395    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1396    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1397    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1398   }
1399    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1400          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1401    }
1402 #endif
1403
1404 #if CK_NO_PROC_POOL
1405 #if NODE_CHECKPOINT
1406   int numnodes = CmiNumPhysicalNodes();
1407 #else
1408   int numnodes = CkNumPes();
1409 #endif
1410   if (numnodes-totalFailed() <=2) {
1411     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1412     _memChkptOn = 0;
1413   }
1414 #endif
1415 }
1416
1417 void CkMemCheckPT::recoverFromSoftFailure()
1418 {
1419         inRestarting = 0;
1420         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1421 }
1422 // called only on 0
1423 void CkMemCheckPT::quiescence(CkCallback &cb)
1424 {
1425   static int pe_count = 0;
1426   pe_count ++;
1427   CmiAssert(CkMyPe() == 0);
1428   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1429   if (pe_count == CkNumPes()) {
1430     pe_count = 0;
1431     cb.send();
1432   }
1433 }
1434
1435 // User callable function - to start a checkpoint
1436 // callback cb is used to pass control back
1437 void CkStartMemCheckpoint(CkCallback &cb)
1438 {
1439 #if CMK_MEM_CHECKPOINT
1440         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1441   if (_memChkptOn == 0) {
1442     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1443     cb.send();
1444     return;
1445   }
1446   if (CkInRestarting()) {
1447       // trying to checkpointing during restart
1448     cb.send();
1449     return;
1450   }
1451     // store user callback and user data
1452   CkMemCheckPT::cpCallback = cb;
1453
1454     // broadcast to start check pointing
1455   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1456   checkptMgr.doItNow(CkMyPe(), cb);
1457 #else
1458   // when mem checkpoint is disabled, invike cb immediately
1459   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1460   cb.send();
1461 #endif
1462 }
1463
1464 void CkRestartCheckPoint(int diePe)
1465 {
1466 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1467   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1468   // broadcast
1469   checkptMgr.restart(diePe);
1470 }
1471
1472 static int _diePE = -1;
1473
1474 // callback function used locally by ccs handler
1475 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1476 {
1477 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1478   CkRestartCheckPoint(_diePE);
1479 }
1480
1481
1482 // called on crashed PE
1483 static void restartBeginHandler(char *msg)
1484 {
1485   CmiFree(msg);
1486 #if CMK_MEM_CHECKPOINT
1487 #if CMK_USE_BARRIER
1488         if(CkMyPe()!=_diePE){
1489                 printf("restar begin on %d\n",CkMyPe());
1490                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1491                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1492                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1493         }else{
1494         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1495         CkRestartCheckPointCallback(NULL, NULL);
1496         }
1497 #else
1498   static int count = 0;
1499   CmiAssert(CkMyPe() == _diePE);
1500   count ++;
1501   if (count == CkNumPes()) {
1502           printf("restart begin on %d\n",CkMyPe());
1503     CkRestartCheckPointCallback(NULL, NULL);
1504     count = 0;
1505   }
1506 #endif
1507 #endif
1508 }
1509
1510 extern void _discard_charm_message();
1511 extern void _resume_charm_message();
1512
1513 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1514         return data;
1515 }
1516
1517 static void restartBcastHandler(char *msg)
1518 {
1519 #if CMK_MEM_CHECKPOINT
1520   // advance phase counter
1521   CkMemCheckPT::inRestarting = 1;
1522   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1523   // gzheng
1524   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1525
1526   if (CkMyPe()==_diePE)
1527     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1528
1529   // reset QD counters
1530 /*  gzheng
1531   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1532 */
1533
1534 /*  gzheng
1535   if (CkMyPe()==_diePE)
1536       CkRestartCheckPointCallback(NULL, NULL);
1537 */
1538   CmiFree(msg);
1539
1540   _resume_charm_message();
1541
1542     // reduction
1543   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1544   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1545 #if CMK_USE_BARRIER
1546         //CmiPrintf("before reduce\n"); 
1547         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1548         //CmiPrintf("after reduce\n");  
1549 #else
1550   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1551 #endif 
1552  checkpointed = 0;
1553 #endif
1554 }
1555
1556 extern void _initDone();
1557
1558 bool compare(char * buf1, char *buf2){
1559         //buf1 my copy, buf2 from another one 
1560 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1561         PUP::checker pchecker(buf1,buf2);
1562         pchecker.skip();
1563         
1564         int numElements;
1565         pchecker|numElements;
1566 //      CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1567         for(int i=0;i<numElements;i++){
1568         //for(int i=0;i<1;i++){
1569                 CkGroupID gID;
1570                 CkArrayIndex idx;
1571                 
1572                 pchecker|gID;
1573                 pchecker|idx;
1574                 
1575 //              CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1576                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1577                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1578 //              CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1579         }
1580         if(pchecker.getResult()){
1581                 CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
1582         }else{
1583                 CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
1584         }
1585         return pchecker.getResult();
1586 }
1587
1588 static void recvRemoteChkpHandler(char *msg){
1589    envelope *env = (envelope *)msg;
1590    CkUnpackMessage(&env);
1591    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1592   if(CpvAccess(recvdLocal)==1){
1593           int pointer = CpvAccess(curPointer);
1594           int size = CpvAccess(chkpBuf)[pointer]->len;
1595           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1596           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1597                         checkptMgr[CkMyPe()].doneComparison(true);
1598           }else
1599           {
1600                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1601                         checkptMgr[CkMyPe()].doneComparison(false);
1602           }
1603   }else{
1604           CpvAccess(recvdRemote) = 1;
1605           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1606           CpvAccess(buddyBuf) = chkpMsg;
1607   }  
1608 }
1609
1610 static void replicaRecoverHandler(char *msg){
1611         CpvAccess(_remoteCrashedNode) = -1;
1612         CkMemCheckPT::replicaAlive = 1;
1613     bool ret = true;
1614         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1615         checkptMgr[CkMyPe()].doneComparison(ret);
1616 }
1617
1618 static void replicaDieHandler(char * msg){
1619 #if CMK_CONVERSE_MPI    
1620         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1621         CpvAccess(_remoteCrashedNode) = diePe;
1622         CkMemCheckPT::replicaAlive = 0;
1623         find_spare_mpirank(diePe,CmiMyPartition()^1);
1624     if(CkMyPe()==diePe){
1625       CkPrintf("pe %d in replicad word die\n",diePe);
1626             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1627     }
1628 #endif
1629         //broadcast to my partition to get local max iter
1630     if(CkMyPe()==diePe){
1631                 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1632                 checkptMgr.getIter();
1633         }
1634     //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1635     //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1636 }
1637
1638
1639 static void replicaDieBcastHandler(char *msg){
1640         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1641         CpvAccess(_remoteCrashedNode) = diePe;
1642         CkMemCheckPT::replicaAlive = 0;
1643 }
1644
1645 static void recoverRemoteProcDataHandler(char *msg){
1646    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1647    envelope *env = (envelope *)msg;
1648    CkUnpackMessage(&env);
1649    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1650         
1651    //store the checkpoint
1652         int pointer = procMsg->pointer;
1653
1654
1655    if(CkMyPe()==CpvAccess(_crashedNode)){
1656            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1657            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1658            PUP::fromMem p(procMsg->packData);
1659            _handleProcData(p,CmiTrue);
1660            _initDone();
1661    }
1662    else{
1663            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1664            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1665            //_handleProcData(p,CmiFalse);
1666    }
1667    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1668    CKLOCMGR_LOOP(mgr->startInserting(););
1669    
1670    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1671    CpvAccess(recvdProcChkp) =1;
1672         if(CpvAccess(recvdArrayChkp)==1){
1673                 _resume_charm_message();
1674                 _diePE = CpvAccess(_crashedNode);
1675                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1676                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1677                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1678         }
1679 }
1680
1681 static void recoverRemoteArrayDataHandler(char *msg){
1682   if(CkMyPe()==CpvAccess(_crashedNode)) 
1683   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1684    envelope *env = (envelope *)msg;
1685    CkUnpackMessage(&env);
1686    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1687         
1688    //store the checkpoint
1689         int pointer = chkpMsg->pointer;
1690         CpvAccess(curPointer) = pointer;
1691         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1692         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1693    CpvAccess(recvdArrayChkp) =1;
1694         CkMemCheckPT::inRestarting = 1;
1695         if(CpvAccess(recvdProcChkp) == 1){
1696                 _resume_charm_message();
1697                 _diePE = CpvAccess(_crashedNode);
1698                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1699                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1700                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1701                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1702         }
1703 }
1704
1705 static void recvPhaseHandler(char * msg)
1706 {
1707         CpvAccess(_curRestartPhase)--;
1708         CkMemCheckPT::inRestarting = 1;
1709         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1710   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1711   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1712   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1713 //              CmiPrintf("start ping check handler\n");
1714 //       }
1715         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1716   // }
1717    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1718    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1719
1720 }
1721 // called on crashed processor
1722 static void recoverProcDataHandler(char *msg)
1723 {
1724 #if CMK_MEM_CHECKPOINT
1725    int i;
1726    envelope *env = (envelope *)msg;
1727    CkUnpackMessage(&env);
1728    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1729    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1730    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1731    PUP::fromMem p(procMsg->packData);
1732    _handleProcData(p,CmiTrue);
1733
1734    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1735    // gzheng
1736    CKLOCMGR_LOOP(mgr->startInserting(););
1737
1738    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1739    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1740    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1741    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1742
1743    _initDone();
1744 //   CpvAccess(_qd)->flushStates();
1745    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1746 #endif
1747 }
1748 //for replica, got the phase number from my neighbor processor in the current partition
1749 static void askPhaseHandler(char *msg)
1750 {
1751 #if CMK_MEM_CHECKPOINT
1752         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1753     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1754         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1755     CmiSetHandler(msg, recvPhaseHandlerIdx);
1756         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1757 #endif
1758 }
1759 // called on its backup processor
1760 // get backup message buffer and sent to crashed processor
1761 static void askProcDataHandler(char *msg)
1762 {
1763 #if CMK_MEM_CHECKPOINT
1764     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1765     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1766     if (CpvAccess(procChkptBuf) == NULL)  {
1767       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1768       CkAbort("no checkpoint found");
1769     }
1770     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1771     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1772
1773     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1774
1775     CkPackMessage(&env);
1776     CmiSetHandler(env, recoverProcDataHandlerIdx);
1777     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1778     CpvAccess(procChkptBuf) = NULL;
1779     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1780 #endif
1781 }
1782
1783 // called on PE 0
1784 void qd_callback(void *m)
1785 {
1786    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1787    CkFreeMsg(m);
1788    if(CmiNumPartition()==1){
1789 #ifdef CMK_SMP
1790            for(int i=0;i<CmiMyNodeSize();i++){
1791            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1792            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1793                 CmiSetHandler(msg, askProcDataHandlerIdx);
1794                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1795                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1796            }
1797            return;
1798 #endif
1799            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1800            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1801            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1802            CmiSetHandler(msg, askProcDataHandlerIdx);
1803            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1804            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1805         }
1806    else{
1807                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1808                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1809                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1810    }
1811 }
1812
1813 // on crashed node
1814 void CkMemRestart(const char *dummy, CkArgMsg *args)
1815 {
1816 #if CMK_MEM_CHECKPOINT
1817    _diePE = CmiMyNode();
1818    CpvAccess( _crashedNode )= CmiMyNode();
1819    CkMemCheckPT::inRestarting = 1;
1820    _discard_charm_message();
1821   
1822   /*if(CmiMyRank()==0){
1823     CkCallback cb(qd_callback);
1824     CkStartQD(cb);
1825     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1826   }*/
1827    if(CmiNumPartition()==1){
1828            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1829                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1830                 restartT = CmiWallTimer();
1831            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1832            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1833            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1834            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1835            CmiSetHandler(msg, askProcDataHandlerIdx);
1836            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1837            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1838    }
1839    else{
1840                 CkCallback cb(qd_callback);
1841                 CkStartQD(cb);
1842    }
1843 #else
1844    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1845 #endif
1846 }
1847
1848 // can be called in other files
1849 // return true if it is in restarting
1850 extern "C"
1851 int CkInRestarting()
1852 {
1853 #if CMK_MEM_CHECKPOINT
1854   if (CpvAccess( _crashedNode)!=-1) return 1;
1855   // gzheng
1856   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1857   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1858   return CkMemCheckPT::inRestarting;
1859 #else
1860   return 0;
1861 #endif
1862 }
1863
1864 extern "C"
1865 int CkReplicaAlive()
1866 {
1867 //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1868 //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1869         return CkMemCheckPT::replicaAlive;
1870
1871         /*if(CkMemCheckPT::replicaDead==1)
1872                 return 0;
1873         else            
1874                 return 1;*/
1875 }
1876
1877 extern "C"
1878 int CkInCheckpointing()
1879 {
1880         return CkMemCheckPT::inCheckpointing;
1881 }
1882
1883 extern "C"
1884 void CkSetInLdb(){
1885 #if CMK_MEM_CHECKPOINT
1886         CkMemCheckPT::inLoadbalancing = 1;
1887 #endif
1888 }
1889
1890 extern "C"
1891 int CkInLdb(){
1892 #if CMK_MEM_CHECKPOINT
1893         return CkMemCheckPT::inLoadbalancing;
1894 #endif
1895         return 0;
1896 }
1897
1898 extern "C"
1899 void CkResetInLdb(){
1900 #if CMK_MEM_CHECKPOINT
1901         CkMemCheckPT::inLoadbalancing = 0;
1902 #endif
1903 }
1904
1905 /*****************************************************************************
1906                 module initialization
1907 *****************************************************************************/
1908
1909 static int arg_where = CkCheckPoint_inMEM;
1910
1911 #if CMK_MEM_CHECKPOINT
1912 void init_memcheckpt(char **argv)
1913 {
1914     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1915       arg_where = CkCheckPoint_inDISK;
1916     }
1917
1918         // initiliazing _crashedNode variable
1919         CpvInitialize(int, _crashedNode);
1920         CpvInitialize(int, _remoteCrashedNode);
1921         CpvAccess(_crashedNode) = -1;
1922         CpvAccess(_remoteCrashedNode) = -1;
1923 }
1924 #endif
1925
1926 class CkMemCheckPTInit: public Chare {
1927 public:
1928   CkMemCheckPTInit(CkArgMsg *m) {
1929 #if CMK_MEM_CHECKPOINT
1930     if (arg_where == CkCheckPoint_inDISK) {
1931       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1932     }
1933     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1934     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1935 #endif
1936   }
1937 };
1938
1939 static void notifyHandler(char *msg)
1940 {
1941 #if CMK_MEM_CHECKPOINT
1942   CmiFree(msg);
1943       /* immediately increase restart phase to filter old messages */
1944   CpvAccess(_curRestartPhase) ++;
1945   CpvAccess(_qd)->flushStates();
1946   _discard_charm_message();
1947
1948 #endif
1949 }
1950
1951 extern "C"
1952 void notify_crash(int node)
1953 {
1954 #ifdef CMK_MEM_CHECKPOINT
1955   CpvAccess( _crashedNode) = node;
1956 #ifdef CMK_SMP
1957   for(int i=0;i<CkMyNodeSize();i++){
1958         CpvAccessOther(_crashedNode,i)=node;
1959   }
1960 #endif
1961   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1962   CkMemCheckPT::inRestarting = 1;
1963
1964     // this may be in interrupt handler, send a message to reset QD
1965   int pe = CmiNodeFirst(CkMyNode());
1966   for(int i=0;i<CkMyNodeSize();i++){
1967         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1968         CmiSetHandler(msg, notifyHandlerIdx);
1969         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1970   }
1971 #endif
1972 }
1973
1974 extern "C" void (*notify_crash_fn)(int node);
1975
1976
1977 #if CMK_CONVERSE_MPI
1978 void buddyDieHandler(char *msg)
1979 {
1980 #if CMK_MEM_CHECKPOINT
1981    // notify
1982         CkMemCheckPT::inRestarting = 1;
1983    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1984    notify_crash(diepe);
1985    // send message to crash pe to let it restart
1986    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1987    CkPrintf("[%d] finding newrank\n",CkMyPe());
1988    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1989    CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1990    int buddy = obj->BuddyPE(CmiMyPe());
1991    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1992    if (buddy == diepe)  {
1993      mpi_restart_crashed(diepe, newrank);
1994    }
1995 #endif
1996 }
1997
1998 void pingHandler(void *msg)
1999 {
2000   lastPingTime = CmiWallTimer();
2001   CmiFree(msg);
2002 }
2003
2004 void pingCheckHandler()
2005 {
2006 #if CMK_MEM_CHECKPOINT
2007   double now = CmiWallTimer();
2008   if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2009   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2010     int i, pe, buddy;
2011     // tell everyone the buddy dies
2012     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2013     for (i = 1; i < CmiNumPes(); i++) {
2014        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2015        if (obj->BuddyPE(pe) == CmiMyPe()) break;
2016     }
2017     buddy = pe;
2018     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2019     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
2020       if (obj->isFailed(pe) || pe == buddy) continue;
2021       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2022       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2023       CmiSetHandler(msg, buddyDieHandlerIdx);
2024       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2025     }*/
2026     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2027     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2028     CmiSetHandler(msg, buddyDieHandlerIdx);
2029     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2030     //send to everyone in the other world
2031         if(CmiNumPartition()!=1){
2032                 for(int i=0;i<CmiNumPes();i++){
2033                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2034                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2035                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
2036                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2037                 }
2038         }
2039   }
2040   else 
2041     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2042 #endif
2043 }
2044
2045 void pingBuddy()
2046 {
2047 #if CMK_MEM_CHECKPOINT
2048   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2049   if (obj) {
2050     int buddy = obj->BuddyPE(CkMyPe());
2051     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2052     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2053     CmiSetHandler(msg, pingHandlerIdx);
2054     CmiGetRestartPhase(msg) = 9999;
2055     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2056   }
2057   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2058 #endif
2059 }
2060 #endif
2061
2062 // initproc
2063 void CkRegisterRestartHandler( )
2064 {
2065 #if CMK_MEM_CHECKPOINT
2066   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2067   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2068   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2069   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2070   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2071   
2072   //for replica
2073   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2074   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2075   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2076   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2077   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2078   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2079   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2080   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2081
2082 #if CMK_CONVERSE_MPI
2083   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2084   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2085   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2086 #endif
2087
2088   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2089   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2090   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2091   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2092   CpvInitialize(int,curPointer);
2093   CpvInitialize(int,recvdLocal);
2094   CpvInitialize(int,recvdRemote);
2095   CpvInitialize(int,recvdProcChkp);
2096   CpvInitialize(int,recvdArrayChkp);
2097
2098   CpvAccess(procChkptBuf) = NULL;
2099   CpvAccess(buddyBuf) = NULL;
2100   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2101   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2102   CpvAccess(chkpBuf)[0] = NULL;
2103   CpvAccess(chkpBuf)[1] = NULL;
2104   CpvAccess(localProcChkpBuf)[0] = NULL;
2105   CpvAccess(localProcChkpBuf)[1] = NULL;
2106   
2107   CpvAccess(curPointer) = 0;
2108   CpvAccess(recvdLocal) = 0;
2109   CpvAccess(recvdRemote) = 0;
2110   CpvAccess(recvdProcChkp) = 0;
2111   CpvAccess(recvdArrayChkp) = 0;
2112
2113   notify_crash_fn = notify_crash;
2114
2115 #if ! CMK_CONVERSE_MPI
2116   // print pid to kill
2117 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2118 //  sleep(4);
2119 #endif
2120 #endif
2121 }
2122
2123
2124 extern "C"
2125 int CkHasCheckpoints()
2126 {
2127   return checkpointed;
2128 }
2129
2130 /// @todo: the following definitions should be moved to a separate file containing
2131 // structures and functions about fault tolerance strategies
2132
2133 /**
2134  *  * @brief: function for killing a process                                             
2135  *   */
2136 #ifdef CMK_MEM_CHECKPOINT
2137 #if CMK_HAS_GETPID
2138 void killLocal(void *_dummy,double curWallTime){
2139         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2140         if(CmiWallTimer()<killTime-1){
2141                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2142         }else{ 
2143 #if CMK_CONVERSE_MPI
2144                                 CkDieNow();
2145 #else 
2146                 kill(getpid(),SIGKILL);                                               
2147 #endif
2148         }              
2149
2150 #else
2151 void killLocal(void *_dummy,double curWallTime){
2152   CmiAbort("kill() not supported!");
2153 }
2154 #endif
2155 #endif
2156
2157 #ifdef CMK_MEM_CHECKPOINT
2158 /**
2159  * @brief: reads the file with the kill information
2160  */
2161 void readKillFile(){
2162         FILE *fp=fopen(killFile,"r");
2163         if(!fp){
2164                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2165                 return;
2166         }
2167         int proc;
2168         double sec;
2169         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2170                 if(proc == CkMyNode() && CkMyRank() == 0){
2171                         killTime = CmiWallTimer()+sec;
2172                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2173                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2174                 }
2175         }
2176         fclose(fp);
2177 }
2178
2179 #if ! CMK_CONVERSE_MPI
2180 void CkDieNow()
2181 {
2182 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2183          // ignored for non-mpi version
2184         CmiPrintf("[%d] die now.\n", CmiMyPe());
2185         killTime = CmiWallTimer()+0.001;
2186         CcdCallFnAfter(killLocal,NULL,1);
2187 #endif
2188 }
2189 #endif
2190
2191 #endif
2192
2193 #include "CkMemCheckpoint.def.h"
2194
2195