Adding support for fault injection - not tested
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117  }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, recvdArrayChkp);
131 CpvDeclare(int, recvdProcChkp);
132
133 bool compare(char * buf1, char * buf2);
134 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
135 // Converse function handles
136 static int askPhaseHandlerIdx;
137 static int recvPhaseHandlerIdx;
138 static int askProcDataHandlerIdx;
139 static int restartBcastHandlerIdx;
140 static int recoverProcDataHandlerIdx;
141 static int restartBeginHandlerIdx;
142 static int recvRemoteChkpHandlerIdx;
143 static int replicaDieHandlerIdx;
144 static int replicaDieBcastHandlerIdx;
145 static int replicaRecoverHandlerIdx;
146 static int recoverRemoteProcDataHandlerIdx;
147 static int recoverRemoteArrayDataHandlerIdx;
148 static int notifyHandlerIdx;
149 // compute the backup processor
150 // FIXME: avoid crashed processors
151 #if CMK_CONVERSE_MPI
152 static int pingHandlerIdx;
153 static int pingCheckHandlerIdx;
154 static int buddyDieHandlerIdx;
155 static double lastPingTime = -1;
156
157 extern "C" void mpi_restart_crashed(int pe, int rank);
158 extern "C" int  find_spare_mpirank(int pe,int partition);
159
160 void pingBuddy();
161 void pingCheckHandler();
162
163 #endif
164 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
165
166 inline int CkMemCheckPT::BuddyPE(int pe)
167 {
168   int budpe;
169 #if NODE_CHECKPOINT
170     // buddy is the processor with same rank on the next physical node
171   int r1 = CmiPhysicalRank(pe);
172   int budnode = CmiPhysicalNodeID(pe);
173   do {
174     budnode = (budnode+1)%CmiNumPhysicalNodes();
175     int *pelist;
176     int num;
177     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
178     budpe = pelist[r1 % num];
179   } while (isFailed(budpe));
180   if (budpe == pe) {
181     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
182     CmiAbort("Failed to find a buddy processor");
183   }
184 #else
185   budpe = pe;
186   while (budpe == pe || isFailed(budpe)) 
187           budpe = (budpe+1)%CkNumPes();
188 #endif
189   return budpe;
190 }
191
192 // called in array element constructor
193 // choose and register with 2 buddies for checkpoiting 
194 #if CMK_MEM_CHECKPOINT
195 void ArrayElement::init_checkpt() {
196         if (_memChkptOn == 0) return;
197         if (CkInRestarting()) {
198           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
199         }
200         // only master init checkpoint
201         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
202
203         budPEs[0] = CkMyPe();
204         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
205         CmiAssert(budPEs[0] != budPEs[1]);
206         // inform checkPTMgr
207         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
208         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
209         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
210         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
211 }
212 #endif
213
214 // entry function invoked by checkpoint mgr asking for checkpoint data
215 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
216 #if CMK_MEM_CHECKPOINT
217 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
218 //char index[128];   thisIndexMax.sprint(index);
219 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
220   CkLocMgr *locMgr = thisArray->getLocMgr();
221   CmiAssert(myRec!=NULL);
222   int size;
223   {
224         PUP::sizer p;
225         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
226         size = p.size();
227   }
228   int packSize = size/sizeof(double) +1;
229   CkArrayCheckPTMessage *msg =
230                  new (packSize, 0) CkArrayCheckPTMessage;
231   msg->len = size;
232   msg->index =thisIndexMax;
233   msg->aid = thisArrayID;
234   msg->locMgr = locMgr->getGroupID();
235   msg->cp_flag = 1;
236   {
237         PUP::toMem p(msg->packData);
238         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
239   }
240
241   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
242   checkptMgr.recvData(msg, 2, budPEs);
243   delete m;
244 #endif
245 }
246
247 // checkpoint holder class - for memory checkpointing
248 class CkMemCheckPTInfo: public CkCheckPTInfo
249 {
250   CkArrayCheckPTMessage *ckBuffer;
251 public:
252   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
253                     CkCheckPTInfo(a, loc, idx, pno)
254   {
255     ckBuffer = NULL;
256   }
257   ~CkMemCheckPTInfo() 
258   {
259     if (ckBuffer) delete ckBuffer; 
260   }
261   inline void updateBuffer(CkArrayCheckPTMessage *data) 
262   {
263     CmiAssert(data!=NULL);
264     if (ckBuffer) delete ckBuffer;
265     ckBuffer = data;
266   }    
267   inline CkArrayCheckPTMessage * getCopy()
268   {
269     if (ckBuffer == NULL) {
270       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
271       CmiAbort("Abort!");
272     }
273     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
274   }     
275   inline void updateBuddy(int b1, int b2) {
276      CmiAssert(ckBuffer);
277      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      CmiAssert(ckBuffer);
283      return ckBuffer->len; 
284   }
285 };
286
287 // checkpoint holder class - for in-disk checkpointing
288 class CkDiskCheckPTInfo: public CkCheckPTInfo 
289 {
290   char *fname;
291   int bud1, bud2;
292   int len;                      // checkpoint size
293 public:
294   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
295   {
296 #if CMK_USE_MKSTEMP
297     fname = new char[64];
298     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
299     mkstemp(fname);
300 #else
301     fname=tmpnam(NULL);
302 #endif
303     bud1 = bud2 = -1;
304     len = 0;
305   }
306   ~CkDiskCheckPTInfo() 
307   {
308     remove(fname);
309   }
310   inline void updateBuffer(CkArrayCheckPTMessage *data) 
311   {
312     double t = CmiWallTimer();
313     // unpack it
314     envelope *env = UsrToEnv(data);
315     CkUnpackMessage(&env);
316     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
317     FILE *f = fopen(fname,"wb");
318     PUP::toDisk p(f);
319     CkPupMessage(p, (void **)&data);
320     // delay sync to the end because otherwise the messages are blocked
321 //    fsync(fileno(f));
322     fclose(f);
323     bud1 = data->bud1;
324     bud2 = data->bud2;
325     len = data->len;
326     delete data;
327     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
328   }
329   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
330   {
331     CkArrayCheckPTMessage *data;
332     FILE *f = fopen(fname,"rb");
333     PUP::fromDisk p(f);
334     CkPupMessage(p, (void **)&data);
335     fclose(f);
336     data->bud1 = bud1;                          // update the buddies
337     data->bud2 = bud2;
338     return data;
339   }
340   inline void updateBuddy(int b1, int b2) {
341      bud1 = b1; bud2 = b2;
342      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
343      CmiAssert(pNo != CkMyPe());
344   }
345   inline int getSize() { 
346      return len; 
347   }
348 };
349
350 CkMemCheckPT::CkMemCheckPT(int w)
351 {
352   int numnodes = 0;
353 #if NODE_CHECKPOINT
354   numnodes = CmiNumPhysicalNodes();
355 #else
356   numnodes = CkNumPes();
357 #endif
358 #if CK_NO_PROC_POOL
359   if (numnodes <= 2)
360 #else
361   if (numnodes  == 1)
362 #endif
363   {
364     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
365     _memChkptOn = 0;
366   }
367   inRestarting = 0;
368   inCheckpointing = 0;
369   recvCount = peCount = 0;
370   ackCount = 0;
371   expectCount = -1;
372   where = w;
373   replicaAlive = 1;
374 #if CMK_CONVERSE_MPI
375   void pingBuddy();
376   void pingCheckHandler();
377   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
378   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
379 #endif
380   chkpTable[0] = NULL;
381   chkpTable[1] = NULL;
382   maxIter = -1;
383   recvIterCount = 0;
384   localDecided = false;
385 }
386
387 CkMemCheckPT::~CkMemCheckPT()
388 {
389   int len = ckTable.length();
390   for (int i=0; i<len; i++) {
391     delete ckTable[i];
392   }
393 }
394
395 void CkMemCheckPT::pup(PUP::er& p) 
396
397   CBase_CkMemCheckPT::pup(p); 
398   p|cpStarter;
399   p|thisFailedPe;
400   p|failedPes;
401   p|ckCheckPTGroupID;           // recover global variable
402   p|cpCallback;                 // store callback
403   p|where;                      // where to checkpoint
404   p|peCount;
405   if (p.isUnpacking()) {
406     recvCount = 0;
407 #if CMK_CONVERSE_MPI
408     void pingBuddy();
409     void pingCheckHandler();
410     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
411     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
412 #endif
413           maxIter = -1;
414           recvIterCount = 0;
415           localDecided = false;
416   }
417 }
418
419 void CkMemCheckPT::getIter(){
420         localDecided = true;
421         localMaxIter = maxIter+1;
422         contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
423 }
424
425 //when one replica failes, decide the next chkp iter
426
427 void CkMemCheckPT::recvIter(int iter){
428         if(maxIter<iter){
429                 maxIter=iter;
430         }
431 }
432
433 void CkMemCheckPT::recvMaxIter(int iter){
434         localDecided = false;
435         CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
436 }
437
438 void CkMemCheckPT::reachChkpIter(){
439         recvIterCount++;
440         elemCount = CkCountArrayElements();
441         if(recvIterCount == elemCount){
442                 recvIterCount = 0;
443                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
444         }
445 }
446
447 void CkMemCheckPT::startChkp(){
448         CkStartMemCheckpoint(cpCallback);
449 }
450
451 // called by checkpoint mgr to restore an array element
452 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
453 {
454 #if CMK_MEM_CHECKPOINT
455   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
456   // m->index.print();
457   PUP::fromMem p(m->packData);
458   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
459   CmiAssert(mgr);
460 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
461   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
462 #else
463   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
464 #endif
465
466   // find a list of array elements bound together
467   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
468   CmiAssert(elt);
469   CkLocRec_local *rec = elt->myRec;
470   CkVec<CkMigratable *> list;
471   mgr->migratableList(rec, list);
472   CmiAssert(list.length() > 0);
473   for (int l=0; l<list.length(); l++) {
474     elt = (ArrayElement *)list[l];
475     elt->budPEs[0] = m->bud1;
476     elt->budPEs[1] = m->bud2;
477     //    reset, may not needed now
478     // for now.
479     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
480       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
481       if (c) c->redNo = 0;
482     }
483   }
484 #endif
485 }
486
487 // return 1 if pe is a crashed processor
488 int CkMemCheckPT::isFailed(int pe)
489 {
490   for (int i=0; i<failedPes.length(); i++)
491     if (failedPes[i] == pe) return 1;
492   return 0;
493 }
494
495 // add pe into history list of all failed processors
496 void CkMemCheckPT::failed(int pe)
497 {
498   if (isFailed(pe)) return;
499   failedPes.push_back(pe);
500 }
501
502 int CkMemCheckPT::totalFailed()
503 {
504   return failedPes.length();
505 }
506
507 // create an checkpoint entry for array element of aid with index.
508 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
509 {
510   // error check, no duplicate
511   int idx, len = ckTable.size();
512   for (idx=0; idx<len; idx++) {
513     CkCheckPTInfo *entry = ckTable[idx];
514     if (index == entry->index) {
515       if (loc == entry->locMgr) {
516           // bindTo array elements
517           return;
518       }
519         // for array inheritance, the following check may fail
520         // because ArrayElement constructor of all superclasses are called
521       if (aid == entry->aid) {
522         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
523         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
524       }
525     }
526   }
527   CkCheckPTInfo *newEntry;
528   if (where == CkCheckPoint_inMEM)
529     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
530   else
531     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
532   ckTable.push_back(newEntry);
533   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
534 }
535
536 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
537 {
538 #if !CMK_CHKP_ALL       
539   int buddy = msg->bud1;
540   if (buddy == CkMyPe()) buddy = msg->bud2;
541   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
542   recvData(msg);
543     // ack
544   thisProxy[buddy].gotData();
545 #else
546   chkpTable[0] = NULL;
547   chkpTable[1] = NULL;
548   recvArrayCheckpoint(msg);
549   thisProxy[msg->bud2].gotData();
550 #endif
551 }
552
553 // loop through my checkpoint table and ask checkpointed array elements
554 // to send me checkpoint data.
555 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
556 {
557   checkpointed = 1;
558   cpCallback = cb;
559   cpStarter = starter;
560   inCheckpointing = 1;
561   if (CkMyPe() == cpStarter) {
562     startTime = CmiWallTimer();
563     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
564   }
565 #if CMK_CONVERSE_MPI
566   if(CmiNumPartition()==1)
567 #endif    
568   {
569
570 #if !CMK_CHKP_ALL
571   int len = ckTable.length();
572   for (int i=0; i<len; i++) {
573     CkCheckPTInfo *entry = ckTable[i];
574       // always let the bigger number processor send request
575     //if (CkMyPe() < entry->pNo) continue;
576       // always let the smaller number processor send request, may on same proc
577     if (!isMaster(entry->pNo)) continue;
578       // call inmem_checkpoint to the array element, ask it to send
579       // back checkpoint data via recvData().
580     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
581     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
582   }
583     // if my table is empty, then I am done
584   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
585 #else
586   startArrayCheckpoint();
587 #endif
588         sendProcData();
589   }
590 #if CMK_CONVERSE_MPI
591   else
592   {
593         startCheckpoint();
594   }
595 #endif
596   // pack and send proc level data
597 }
598
599 class MemElementPacker : public CkLocIterator{
600         private:
601                 CkLocMgr *locMgr;
602                 PUP::er &p;
603         public:
604                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
605                 void addLocation(CkLocation &loc){
606                         CkArrayIndexMax idx = loc.getIndex();
607                         CkGroupID gID = locMgr->ckGetGroupID();
608                         p|gID;
609                         p|idx;
610                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
611                 }
612 };
613
614 void pupAllElements(PUP::er &p){
615 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
616         int numElements;
617         if(!p.isUnpacking()){
618                 numElements = CkCountArrayElements();
619         }
620         p | numElements;
621         if(!p.isUnpacking()){
622                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
623         }
624 #endif
625 }
626
627 void CkMemCheckPT::startArrayCheckpoint(){
628 #if CMK_CHKP_ALL
629         int size;
630         {
631                 PUP::sizer psizer;
632                 pupAllElements(psizer);
633                 size = psizer.size();
634         }
635         int packSize = size/sizeof(double)+1;
636  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
637         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
638         msg->len = size;
639         msg->cp_flag = 1;
640         int budPEs[2];
641         msg->bud1=CkMyPe();
642         msg->bud2=ChkptOnPe(CkMyPe());
643         {
644                 PUP::toMem p(msg->packData);
645                 pupAllElements(p);
646         }
647         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
648         if(chkpTable[0]) delete chkpTable[0];
649         chkpTable[0] = msg;
650         //send the checkpoint to my 
651         recvCount++;
652 #endif
653 }
654
655 void CkMemCheckPT::startCheckpoint(){
656 #if CMK_CONVERSE_MPI
657         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
658     CkPrintf("in start checkpointing!!!!\n");
659   int size;
660   {
661     PUP::sizer p;
662     _handleProcData(p,CmiFalse);
663     size = p.size();
664   }
665         int packSize = size/sizeof(double)+1;
666         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
667         procMsg->len = size;
668         procMsg->cp_flag = 1;
669         {
670                 PUP::toMem p(procMsg->packData);
671                 _handleProcData(p,CmiFalse);
672         }
673         int pointer = CpvAccess(curPointer);
674         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
675                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
676         
677         {
678                 PUP::sizer psizer;
679                 pupAllElements(psizer);
680                 size = psizer.size();
681         }
682         packSize = size/sizeof(double)+1;
683         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
684         msg->len = size;
685         msg->cp_flag = 1;
686         {
687                 PUP::toMem p(msg->packData);
688                 pupAllElements(p);
689         }
690         pointer = CpvAccess(curPointer);
691         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
692     CkPrintf("start checkpointing!!!!\n");
693         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
694                 CpvAccess(chkpBuf)[pointer] = msg;
695         if(CkReplicaAlive()==1){
696                 CpvAccess(recvdLocal) = 1;
697                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
698                 CkPackMessage(&env);
699                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
700                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
701         }
702         if(CpvAccess(recvdRemote)==1){
703                 //compare the checkpoint 
704           int size = CpvAccess(chkpBuf)[pointer]->len;
705           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
706                 thisProxy[CkMyPe()].doneComparison(true);
707           }else{
708                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
709                 thisProxy[CkMyPe()].doneComparison(false);
710           }
711   }
712         else{
713                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
714                         {       
715                                 int pointer = CpvAccess(curPointer);
716                                 //send the proc data
717                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
718                                 procMsg->pointer = pointer;
719                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
720                                 CkPackMessage(&env);
721                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
722                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
723                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
724                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
725                                 }
726                         }
727                         //send the array checkpoint data
728                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
729                         msg->pointer = CpvAccess(curPointer);
730                         envelope * env = (envelope *)(UsrToEnv(msg));
731                         CkPackMessage(&env);
732                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
733                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
734                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
735                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
736                         //can continue work, no need to wait for my replica
737                 }
738         }
739 #endif
740 }
741
742 void CkMemCheckPT::doneComparison(bool ret){
743         int _ret;
744         if(!ret){
745         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
746                 _ret = 0;
747         }else{
748                 _ret = 1;
749         }
750         inCheckpointing = 0;
751         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
752         contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
753 }
754
755 void CkMemCheckPT::doneRComparison(int ret){
756         CpvAccess(recvdRemote) = 0;
757         CpvAccess(recvdLocal) = 0;
758 //      if(CpvAccess(curPointer) == 0){
759         if(ret==1){
760         CpvAccess(curPointer)^=1;
761                 if(CkMyPe() == 0){
762                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
763                 }
764                 CKLOCMGR_LOOP(mgr->resumeFromChkp(););
765         }
766         else{
767         CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
768                 RollBack();
769         }
770 }
771
772 void CkMemCheckPT::RollBack(){
773         //restore group data
774         checkpointed = 0;
775         CkMemCheckPT::inRestarting = 1;
776         int pointer = CpvAccess(curPointer)^1;//use the previous one
777     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
778         PUP::fromMem p(chkpMsg->packData);      
779         
780         //destroy array elements
781         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
782           int numGroups = CkpvAccess(_groupIDTable)->size();
783           for(int i=0;i<numGroups;i++) {
784                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
785                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
786                 obj->flushStates();
787                 obj->ckJustMigrated();
788           }
789         //restore array elements
790         
791         int numElements;
792         p|numElements;
793         
794         if(p.isUnpacking()){
795                 for(int i=0;i<numElements;i++){
796                 //for(int i=0;i<1;i++){
797                         CkGroupID gID;
798                         CkArrayIndex idx;
799                         p|gID;
800                         p|idx;
801                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
802                         CmiAssert(mgr);
803                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
804                 }
805         }
806         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
807         contribute(cb);
808 }
809
810 void CkMemCheckPT::notifyReplicaDie(int pe){
811         //CkPrintf("[%d] receive replica die\n",CkMyPe());
812         replicaAlive = 0;
813         CpvAccess(_remoteCrashedNode) = pe;
814 }
815
816 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
817 {
818 #if CMK_CHKP_ALL
819         int idx = 1;
820         if(msg->bud1 == CkMyPe()){
821                 idx = 0;
822         }
823         int isChkpting = msg->cp_flag;
824         if(isChkpting == 1){
825                 if(chkpTable[idx]) delete chkpTable[idx];
826         }
827         chkpTable[idx] = msg;
828         if(isChkpting){
829                 recvCount++;
830                 if(recvCount == 2){
831                   if (where == CkCheckPoint_inMEM) {
832                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
833                   }
834                   else if (where == CkCheckPoint_inDISK) {
835                         // another barrier for finalize the writing using fsync
836                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
837                         contribute(0,NULL,CkReduction::sum_int,localcb);
838                   }
839                   else
840                         CmiAbort("Unknown checkpoint scheme");
841                   recvCount = 0;
842                 }
843         }
844 #endif
845 }
846
847 // don't handle array elements
848 static inline void _handleProcData(PUP::er &p, CmiBool create)
849 {
850     // save readonlys, and callback BTW
851     CkPupROData(p);
852
853     // save mainchares 
854     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
855         
856 #ifndef CMK_CHARE_USE_PTR
857     // save non-migratable chare
858     CkPupChareData(p);
859 #endif
860
861     // save groups into Groups.dat
862     CkPupGroupData(p,create);
863
864     // save nodegroups into NodeGroups.dat
865     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
866 }
867
868 void CkMemCheckPT::sendProcData()
869 {
870   // find out size of buffer
871   int size;
872   {
873     PUP::sizer p;
874     _handleProcData(p,CmiTrue);
875     size = p.size();
876   }
877   int packSize = size;
878   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
879   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
880   {
881     PUP::toMem p(msg->packData);
882     _handleProcData(p,CmiTrue);
883   }
884   msg->pe = CkMyPe();
885   msg->len = size;
886   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
887   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
888 }
889
890 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
891 {
892   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
893   CpvAccess(procChkptBuf) = msg;
894   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
895   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
896 }
897
898 // ArrayElement call this function to give us the checkpointed data
899 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
900 {
901   int len = ckTable.length();
902   int idx;
903   for (idx=0; idx<len; idx++) {
904     CkCheckPTInfo *entry = ckTable[idx];
905     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
906   }
907   CkAssert(idx < len);
908   int isChkpting = msg->cp_flag;
909   ckTable[idx]->updateBuffer(msg);
910   if (isChkpting) {
911       // all my array elements have returned their inmem data
912       // inform starter processor that I am done.
913     recvCount ++;
914     if (recvCount == ckTable.length()) {
915       if (where == CkCheckPoint_inMEM) {
916         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
917       }
918       else if (where == CkCheckPoint_inDISK) {
919         // another barrier for finalize the writing using fsync
920         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
921         contribute(0,NULL,CkReduction::sum_int,localcb);
922       }
923       else
924         CmiAbort("Unknown checkpoint scheme");
925       recvCount = 0;
926     } 
927   }
928 }
929
930 // only used in disk checkpointing
931 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
932 {
933   delete m;
934 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
935   system("sync");
936 #endif
937   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
938 }
939
940 // only is called on cpStarter when checkpoint is done
941 void CkMemCheckPT::cpFinish()
942 {
943   CmiAssert(CkMyPe() == cpStarter);
944   peCount++;
945     // now that all processors have finished, activate callback
946   if (peCount == 2) 
947 {
948     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
949     peCount = 0;
950     thisProxy.report();
951   }
952 }
953
954 // for debugging, report checkpoint info
955 void CkMemCheckPT::report()
956 {
957         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
958         inCheckpointing = 0;
959 #if !CMK_CHKP_ALL
960   int objsize = 0;
961   int len = ckTable.length();
962   for (int i=0; i<len; i++) {
963     CkCheckPTInfo *entry = ckTable[i];
964     CmiAssert(entry);
965     objsize += entry->getSize();
966   }
967   CmiAssert(CpvAccess(procChkptBuf));
968   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
969 #else
970   if(CkMyPe()==0)
971   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
972 #endif
973 }
974
975 /*****************************************************************************
976                         RESTART Procedure
977 *****************************************************************************/
978
979 // master processor of two buddies
980 inline int CkMemCheckPT::isMaster(int buddype)
981 {
982 #if 0
983   int mype = CkMyPe();
984 //CkPrintf("ismaster: %d %d\n", pe, mype);
985   if (CkNumPes() - totalFailed() == 2) {
986     return mype > buddype;
987   }
988   for (int i=1; i<CkNumPes(); i++) {
989     int me = (buddype+i)%CkNumPes();
990     if (isFailed(me)) continue;
991     if (me == mype) return 1;
992     else return 0;
993   }
994   return 0;
995 #else
996     // smaller one
997   int mype = CkMyPe();
998 //CkPrintf("ismaster: %d %d\n", pe, mype);
999   if (CkNumPes() - totalFailed() == 2) {
1000     return mype < buddype;
1001   }
1002 #if NODE_CHECKPOINT
1003   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1004   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1005 #else
1006   for (int i=1; i<CkNumPes(); i++) {
1007 #endif
1008     int me = (mype+i)%CkNumPes();
1009     if (isFailed(me)) continue;
1010     if (me == buddype) return 1;
1011     else return 0;
1012   }
1013   return 0;
1014 #endif
1015 }
1016
1017
1018
1019 #if 0
1020 // helper class to pup all elements that belong to same ckLocMgr
1021 class ElementDestoryer : public CkLocIterator {
1022 private:
1023         CkLocMgr *locMgr;
1024 public:
1025         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1026         void addLocation(CkLocation &loc) {
1027                 CkArrayIndex idx=loc.getIndex();
1028                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1029                 loc.destroy();
1030         }
1031 };
1032 #endif
1033
1034 // restore the bitmap vector for LB
1035 void CkMemCheckPT::resetLB(int diepe)
1036 {
1037 #if CMK_LBDB_ON
1038   int i;
1039   char *bitmap = new char[CkNumPes()];
1040   // set processor available bitmap
1041   get_avail_vector(bitmap);
1042
1043   for (i=0; i<failedPes.length(); i++)
1044     bitmap[failedPes[i]] = 0; 
1045   bitmap[diepe] = 0;
1046
1047 #if CK_NO_PROC_POOL
1048   set_avail_vector(bitmap);
1049 #endif
1050
1051   // if I am the crashed pe, rebuild my failedPEs array
1052   if (CkMyNode() == diepe)
1053     for (i=0; i<CkNumPes(); i++) 
1054       if (bitmap[i]==0) failed(i);
1055
1056   delete [] bitmap;
1057 #endif
1058 }
1059
1060 static double restartT;
1061
1062 // in case when failedPe dies, everybody go through its checkpoint table:
1063 // destory all array elements
1064 // recover lost buddies
1065 // reconstruct all array elements from check point data
1066 // called on all processors
1067 void CkMemCheckPT::restart(int diePe)
1068 {
1069 #if CMK_MEM_CHECKPOINT
1070   double curTime = CmiWallTimer();
1071   if (CkMyPe() == diePe){
1072            restartT = CmiWallTimer();
1073     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1074   }
1075   stage = (char*)"resetLB";
1076   startTime = curTime;
1077   if (CkMyPe() == diePe)
1078     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1079
1080 #if CK_NO_PROC_POOL
1081   failed(diePe);        // add into the list of failed pes
1082 #endif
1083   thisFailedPe = diePe;
1084
1085   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1086
1087   inRestarting = 1;
1088                                                                                 
1089   // disable load balancer's barrier
1090   //if (CkMyPe() != diePe) resetLB(diePe);
1091
1092   CKLOCMGR_LOOP(mgr->startInserting(););
1093
1094
1095   if(CmiNumPartition()==1){
1096         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1097   }else{
1098         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1099         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1100   }
1101 #endif
1102 }
1103
1104 // loally remove all array elements
1105 void CkMemCheckPT::removeArrayElements()
1106 {
1107 #if CMK_MEM_CHECKPOINT
1108   int len = ckTable.length();
1109   double curTime = CmiWallTimer();
1110   if (CkMyPe() == thisFailedPe) 
1111     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1112   stage = (char*)"removeArrayElements";
1113   startTime = curTime;
1114
1115 //  if (cpCallback.isInvalid()) 
1116 //        CkPrintf("invalid pe %d\n",CkMyPe());
1117 //        CkAbort("Didn't set restart callback\n");;
1118   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1119
1120   // get rid of all buffering and remote recs
1121   // including destorying all array elements
1122 #if CK_NO_PROC_POOL  
1123         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1124 #else
1125         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1126 #endif
1127   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1128 #endif
1129 }
1130
1131 // flush state in reduction manager
1132 void CkMemCheckPT::resetReductionMgr()
1133 {
1134   if (CkMyPe() == thisFailedPe) 
1135     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1136   int numGroups = CkpvAccess(_groupIDTable)->size();
1137   for(int i=0;i<numGroups;i++) {
1138     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1139     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1140     obj->flushStates();
1141     obj->ckJustMigrated();
1142   }
1143   // reset again
1144   //CpvAccess(_qd)->flushStates();
1145   if(CmiNumPartition()==1){
1146         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1147   }
1148   else
1149         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1150 }
1151
1152 // recover the lost buddies
1153 void CkMemCheckPT::recoverBuddies()
1154 {
1155   int idx;
1156   int len = ckTable.length();
1157   // ready to flush reduction manager
1158   // cannot be CkMemCheckPT::restart because destory will modify states
1159   double curTime = CmiWallTimer();
1160   if (CkMyPe() == thisFailedPe)
1161   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1162   stage = (char *)"recoverBuddies";
1163   if (CkMyPe() == thisFailedPe)
1164   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1165   startTime = curTime;
1166
1167   // recover buddies
1168   expectCount = 0;
1169 #if !CMK_CHKP_ALL
1170   for (idx=0; idx<len; idx++) {
1171     CkCheckPTInfo *entry = ckTable[idx];
1172     if (entry->pNo == thisFailedPe) {
1173 #if CK_NO_PROC_POOL
1174       // find a new buddy
1175       int budPe = BuddyPE(CkMyPe());
1176 #else
1177       int budPe = thisFailedPe;
1178 #endif
1179       CkArrayCheckPTMessage *msg = entry->getCopy();
1180       msg->bud1 = budPe;
1181       msg->bud2 = CkMyPe();
1182       msg->cp_flag = 0;            // not checkpointing
1183       thisProxy[budPe].recoverEntry(msg);
1184       expectCount ++;
1185     }
1186   }
1187 #else
1188   //send to failed pe
1189   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1190 #if CK_NO_PROC_POOL
1191       // find a new buddy
1192       int budPe = BuddyPE(CkMyPe());
1193 #else
1194       int budPe = thisFailedPe;
1195 #endif
1196       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1197       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1198           msg->cp_flag = 0;            // not checkpointing
1199       msg->bud1 = budPe;
1200       msg->bud2 = CkMyPe();
1201       thisProxy[budPe].recoverEntry(msg);
1202       expectCount ++;
1203   }
1204 #endif
1205
1206   if (expectCount == 0) {
1207           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1208   }
1209   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1210 }
1211
1212 void CkMemCheckPT::gotData()
1213 {
1214   ackCount ++;
1215   if (ackCount == expectCount) {
1216     ackCount = 0;
1217     expectCount = -1;
1218     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1219     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1220   }
1221 }
1222
1223 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1224 {
1225
1226   for (int i=0; i<n; i++) {
1227     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1228     mgr->updateLocation(idx[i], nowOnPe);
1229   }
1230         thisProxy[nowOnPe].gotReply();
1231 }
1232
1233 // restore array elements
1234 void CkMemCheckPT::recoverArrayElements()
1235 {
1236   double curTime = CmiWallTimer();
1237   int len = ckTable.length();
1238   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1239   stage = (char *)"recoverArrayElements";
1240   if (CkMyPe() == thisFailedPe)
1241   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1242   startTime = curTime;
1243  int flag = 0;
1244   // recover all array elements
1245   int count = 0;
1246
1247 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1248   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1249   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1250 #endif
1251
1252 #if !CMK_CHKP_ALL
1253   for (int idx=0; idx<len; idx++)
1254   {
1255     CkCheckPTInfo *entry = ckTable[idx];
1256 #if CK_NO_PROC_POOL
1257     // the bigger one will do 
1258 //    if (CkMyPe() < entry->pNo) continue;
1259     if (!isMaster(entry->pNo)) continue;
1260 #else
1261     // smaller one do it, which has the original object
1262     if (CkMyPe() == entry->pNo+1 || 
1263         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1264 #endif
1265 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1266
1267     entry->updateBuddy(CkMyPe(), entry->pNo);
1268     CkArrayCheckPTMessage *msg = entry->getCopy();
1269     // gzheng
1270     //thisProxy[CkMyPe()].inmem_restore(msg);
1271     inmem_restore(msg);
1272 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1273     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1274     int homePe = mgr->homePe(msg->index);
1275     if (homePe != CkMyPe()) {
1276       gmap[homePe].push_back(msg->locMgr);
1277       imap[homePe].push_back(msg->index);
1278     }
1279 #endif
1280     CkFreeMsg(msg);
1281     count ++;
1282   }
1283 #else
1284         double * packData;
1285         if(CmiNumPartition()==1){
1286                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1287                 packData = msg->packData;
1288         }
1289         else{
1290                 int pointer = CpvAccess(curPointer);
1291                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1292                 packData = msg->packData;
1293         }
1294 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1295         recoverAll(packData,gmap,imap);
1296 #else
1297         recoverAll(packData);
1298 #endif
1299 #endif
1300   curTime = CmiWallTimer();
1301   //if (CkMyPe() == thisFailedPe)
1302   if (CkMyPe() == 0)
1303         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1304 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1305   for (int i=0; i<CkNumPes(); i++) {
1306     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1307       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1308         flag++; 
1309         }
1310   }
1311   delete [] imap;
1312   delete [] gmap;
1313 #endif
1314   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1315
1316   CKLOCMGR_LOOP(mgr->doneInserting(););
1317
1318   // _crashedNode = -1;
1319   CpvAccess(_crashedNode) = -1;
1320   inRestarting = 0;
1321 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1322   if (CkMyPe() == 0)
1323     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1324 #else
1325 if(flag == 0)
1326 {
1327     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1328 }
1329 #endif
1330 }
1331
1332 void CkMemCheckPT::gotReply(){
1333     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1334 }
1335
1336 void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1337 #if CMK_CHKP_ALL
1338         PUP::fromMem p(packData);
1339         int numElements;
1340         p|numElements;
1341         if(p.isUnpacking()){
1342                 for(int i=0;i<numElements;i++){
1343                         CkGroupID gID;
1344                         CkArrayIndex idx;
1345                         p|gID;
1346                         p|idx;
1347                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1348                         int homePe = mgr->homePe(idx);
1349 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1350                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1351 #else
1352                         if(CmiNumPartition()==1)
1353                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1354                         else{
1355                                 if(CkMyPe()==thisFailedPe){
1356                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1357                                 }
1358                         else{
1359                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1360                                 }
1361                         }
1362 #endif
1363 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1364                         homePe = mgr->homePe(idx);
1365                         if (homePe != CkMyPe()) {
1366                           gmap[homePe].push_back(gID);
1367                           imap[homePe].push_back(idx);
1368                         }
1369 #endif
1370                 }
1371         }
1372 #endif
1373 }
1374
1375
1376 // on every processor
1377 // turn load balancer back on
1378 void CkMemCheckPT::finishUp()
1379 {
1380   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1381   //CKLOCMGR_LOOP(mgr->doneInserting(););
1382   
1383   if (CkMyPe() == thisFailedPe)
1384   {
1385        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1386        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1387   }
1388   CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1389         
1390 #if CMK_CONVERSE_MPI    
1391   if(CmiNumPartition()!=1){
1392         CpvAccess(recvdProcChkp) = 0;
1393         CpvAccess(recvdArrayChkp) = 0;
1394         CpvAccess(curPointer)^=1;
1395         //notify my replica, restart is done
1396    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1397    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1398    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1399   }
1400    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1401          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1402    }
1403 #endif
1404
1405 #if CK_NO_PROC_POOL
1406 #if NODE_CHECKPOINT
1407   int numnodes = CmiNumPhysicalNodes();
1408 #else
1409   int numnodes = CkNumPes();
1410 #endif
1411   if (numnodes-totalFailed() <=2) {
1412     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1413     _memChkptOn = 0;
1414   }
1415 #endif
1416 }
1417
1418 void CkMemCheckPT::recoverFromSoftFailure()
1419 {
1420         inRestarting = 0;
1421         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1422 }
1423 // called only on 0
1424 void CkMemCheckPT::quiescence(CkCallback &cb)
1425 {
1426   static int pe_count = 0;
1427   pe_count ++;
1428   CmiAssert(CkMyPe() == 0);
1429   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1430   if (pe_count == CkNumPes()) {
1431     pe_count = 0;
1432     cb.send();
1433   }
1434 }
1435
1436 // User callable function - to start a checkpoint
1437 // callback cb is used to pass control back
1438 void CkStartMemCheckpoint(CkCallback &cb)
1439 {
1440 #if CMK_MEM_CHECKPOINT
1441         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1442   if (_memChkptOn == 0) {
1443     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1444     cb.send();
1445     return;
1446   }
1447   if (CkInRestarting()) {
1448       // trying to checkpointing during restart
1449     cb.send();
1450     return;
1451   }
1452     // store user callback and user data
1453   CkMemCheckPT::cpCallback = cb;
1454
1455     // broadcast to start check pointing
1456   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1457   checkptMgr.doItNow(CkMyPe(), cb);
1458 #else
1459   // when mem checkpoint is disabled, invike cb immediately
1460   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1461   cb.send();
1462 #endif
1463 }
1464
1465 void CkRestartCheckPoint(int diePe)
1466 {
1467 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1468   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1469   // broadcast
1470   checkptMgr.restart(diePe);
1471 }
1472
1473 static int _diePE = -1;
1474
1475 // callback function used locally by ccs handler
1476 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1477 {
1478 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1479   CkRestartCheckPoint(_diePE);
1480 }
1481
1482
1483 // called on crashed PE
1484 static void restartBeginHandler(char *msg)
1485 {
1486   CmiFree(msg);
1487 #if CMK_MEM_CHECKPOINT
1488 #if CMK_USE_BARRIER
1489         if(CkMyPe()!=_diePE){
1490                 printf("restar begin on %d\n",CkMyPe());
1491                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1492                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1493                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1494         }else{
1495         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1496         CkRestartCheckPointCallback(NULL, NULL);
1497         }
1498 #else
1499   static int count = 0;
1500   CmiAssert(CkMyPe() == _diePE);
1501   count ++;
1502   if (count == CkNumPes()) {
1503           printf("restart begin on %d\n",CkMyPe());
1504     CkRestartCheckPointCallback(NULL, NULL);
1505     count = 0;
1506   }
1507 #endif
1508 #endif
1509 }
1510
1511 extern void _discard_charm_message();
1512 extern void _resume_charm_message();
1513
1514 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1515         return data;
1516 }
1517
1518 static void restartBcastHandler(char *msg)
1519 {
1520 #if CMK_MEM_CHECKPOINT
1521   // advance phase counter
1522   CkMemCheckPT::inRestarting = 1;
1523   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1524   // gzheng
1525   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1526
1527   if (CkMyPe()==_diePE)
1528     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1529
1530   // reset QD counters
1531 /*  gzheng
1532   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1533 */
1534
1535 /*  gzheng
1536   if (CkMyPe()==_diePE)
1537       CkRestartCheckPointCallback(NULL, NULL);
1538 */
1539   CmiFree(msg);
1540
1541   _resume_charm_message();
1542
1543     // reduction
1544   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1545   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1546 #if CMK_USE_BARRIER
1547         //CmiPrintf("before reduce\n"); 
1548         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1549         //CmiPrintf("after reduce\n");  
1550 #else
1551   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1552 #endif 
1553  checkpointed = 0;
1554 #endif
1555 }
1556
1557 extern void _initDone();
1558
1559 bool compare(char * buf1, char *buf2){
1560         //buf1 my copy, buf2 from another one 
1561 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1562         PUP::checker pchecker(buf1,buf2);
1563         pchecker.skip();
1564         
1565         int numElements;
1566         pchecker|numElements;
1567 //      CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1568         for(int i=0;i<numElements;i++){
1569         //for(int i=0;i<1;i++){
1570                 CkGroupID gID;
1571                 CkArrayIndex idx;
1572                 
1573                 pchecker|gID;
1574                 pchecker|idx;
1575                 
1576 //              CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1577                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1578                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1579 //              CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1580         }
1581         if(pchecker.getResult()){
1582                 CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
1583         }else{
1584                 CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
1585         }
1586         return pchecker.getResult();
1587 }
1588
1589 static void recvRemoteChkpHandler(char *msg){
1590    envelope *env = (envelope *)msg;
1591    CkUnpackMessage(&env);
1592    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1593   if(CpvAccess(recvdLocal)==1){
1594           int pointer = CpvAccess(curPointer);
1595           int size = CpvAccess(chkpBuf)[pointer]->len;
1596           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1597           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1598                         checkptMgr[CkMyPe()].doneComparison(true);
1599           }else
1600           {
1601                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1602                         checkptMgr[CkMyPe()].doneComparison(false);
1603           }
1604   }else{
1605           CpvAccess(recvdRemote) = 1;
1606           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1607           CpvAccess(buddyBuf) = chkpMsg;
1608   }  
1609 }
1610
1611 static void replicaRecoverHandler(char *msg){
1612         CpvAccess(_remoteCrashedNode) = -1;
1613         CkMemCheckPT::replicaAlive = 1;
1614     bool ret = true;
1615         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1616         checkptMgr[CkMyPe()].doneComparison(ret);
1617 }
1618
1619 static void replicaDieHandler(char * msg){
1620 #if CMK_CONVERSE_MPI    
1621         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1622         CpvAccess(_remoteCrashedNode) = diePe;
1623         CkMemCheckPT::replicaAlive = 0;
1624         find_spare_mpirank(diePe,CmiMyPartition()^1);
1625     if(CkMyPe()==diePe){
1626       CkPrintf("pe %d in replicad word die\n",diePe);
1627             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1628     }
1629 #endif
1630         //broadcast to my partition to get local max iter
1631     if(CkMyPe()==diePe){
1632                 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1633                 checkptMgr.getIter();
1634         }
1635     //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1636     //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1637 }
1638
1639
1640 static void replicaDieBcastHandler(char *msg){
1641         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1642         CpvAccess(_remoteCrashedNode) = diePe;
1643         CkMemCheckPT::replicaAlive = 0;
1644 }
1645
1646 static void recoverRemoteProcDataHandler(char *msg){
1647    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1648    envelope *env = (envelope *)msg;
1649    CkUnpackMessage(&env);
1650    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1651         
1652    //store the checkpoint
1653         int pointer = procMsg->pointer;
1654
1655
1656    if(CkMyPe()==CpvAccess(_crashedNode)){
1657            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1658            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1659            PUP::fromMem p(procMsg->packData);
1660            _handleProcData(p,CmiTrue);
1661            _initDone();
1662    }
1663    else{
1664            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1665            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1666            //_handleProcData(p,CmiFalse);
1667    }
1668    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1669    CKLOCMGR_LOOP(mgr->startInserting(););
1670    
1671    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1672    CpvAccess(recvdProcChkp) =1;
1673         if(CpvAccess(recvdArrayChkp)==1){
1674                 _resume_charm_message();
1675                 _diePE = CpvAccess(_crashedNode);
1676                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1677                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1678                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1679         }
1680 }
1681
1682 static void recoverRemoteArrayDataHandler(char *msg){
1683   if(CkMyPe()==CpvAccess(_crashedNode)) 
1684   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1685    envelope *env = (envelope *)msg;
1686    CkUnpackMessage(&env);
1687    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1688         
1689    //store the checkpoint
1690         int pointer = chkpMsg->pointer;
1691         CpvAccess(curPointer) = pointer;
1692         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1693         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1694    CpvAccess(recvdArrayChkp) =1;
1695         CkMemCheckPT::inRestarting = 1;
1696         if(CpvAccess(recvdProcChkp) == 1){
1697                 _resume_charm_message();
1698                 _diePE = CpvAccess(_crashedNode);
1699                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1700                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1701                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1702                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1703         }
1704 }
1705
1706 static void recvPhaseHandler(char * msg)
1707 {
1708         CpvAccess(_curRestartPhase)--;
1709         CkMemCheckPT::inRestarting = 1;
1710         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1711   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1712   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1713   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1714 //              CmiPrintf("start ping check handler\n");
1715 //       }
1716         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1717   // }
1718    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1719    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1720
1721 }
1722 // called on crashed processor
1723 static void recoverProcDataHandler(char *msg)
1724 {
1725 #if CMK_MEM_CHECKPOINT
1726    int i;
1727    envelope *env = (envelope *)msg;
1728    CkUnpackMessage(&env);
1729    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1730    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1731    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1732    PUP::fromMem p(procMsg->packData);
1733    _handleProcData(p,CmiTrue);
1734
1735    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1736    // gzheng
1737    CKLOCMGR_LOOP(mgr->startInserting(););
1738
1739    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1740    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1741    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1742    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1743
1744    _initDone();
1745 //   CpvAccess(_qd)->flushStates();
1746    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1747 #endif
1748 }
1749 //for replica, got the phase number from my neighbor processor in the current partition
1750 static void askPhaseHandler(char *msg)
1751 {
1752 #if CMK_MEM_CHECKPOINT
1753         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1754     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1755         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1756     CmiSetHandler(msg, recvPhaseHandlerIdx);
1757         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1758 #endif
1759 }
1760 // called on its backup processor
1761 // get backup message buffer and sent to crashed processor
1762 static void askProcDataHandler(char *msg)
1763 {
1764 #if CMK_MEM_CHECKPOINT
1765     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1766     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1767     if (CpvAccess(procChkptBuf) == NULL)  {
1768       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1769       CkAbort("no checkpoint found");
1770     }
1771     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1772     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1773
1774     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1775
1776     CkPackMessage(&env);
1777     CmiSetHandler(env, recoverProcDataHandlerIdx);
1778     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1779     CpvAccess(procChkptBuf) = NULL;
1780     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1781 #endif
1782 }
1783
1784 // called on PE 0
1785 void qd_callback(void *m)
1786 {
1787    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1788    CkFreeMsg(m);
1789    if(CmiNumPartition()==1){
1790 #ifdef CMK_SMP
1791            for(int i=0;i<CmiMyNodeSize();i++){
1792            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1793            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1794                 CmiSetHandler(msg, askProcDataHandlerIdx);
1795                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1796                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1797            }
1798            return;
1799 #endif
1800            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1801            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1802            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1803            CmiSetHandler(msg, askProcDataHandlerIdx);
1804            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1805            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1806         }
1807    else{
1808                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1809                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1810                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1811    }
1812 }
1813
1814 // on crashed node
1815 void CkMemRestart(const char *dummy, CkArgMsg *args)
1816 {
1817 #if CMK_MEM_CHECKPOINT
1818    _diePE = CmiMyNode();
1819    CpvAccess( _crashedNode )= CmiMyNode();
1820    CkMemCheckPT::inRestarting = 1;
1821    _discard_charm_message();
1822   
1823   /*if(CmiMyRank()==0){
1824     CkCallback cb(qd_callback);
1825     CkStartQD(cb);
1826     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1827   }*/
1828    if(CmiNumPartition()==1){
1829            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1830                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1831                 restartT = CmiWallTimer();
1832            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1833            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1834            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1835            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1836            CmiSetHandler(msg, askProcDataHandlerIdx);
1837            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1838            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1839    }
1840    else{
1841                 CkCallback cb(qd_callback);
1842                 CkStartQD(cb);
1843    }
1844 #else
1845    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1846 #endif
1847 }
1848
1849 // can be called in other files
1850 // return true if it is in restarting
1851 extern "C"
1852 int CkInRestarting()
1853 {
1854 #if CMK_MEM_CHECKPOINT
1855   if (CpvAccess( _crashedNode)!=-1) return 1;
1856   // gzheng
1857   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1858   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1859   return CkMemCheckPT::inRestarting;
1860 #else
1861   return 0;
1862 #endif
1863 }
1864
1865 extern "C"
1866 int CkReplicaAlive()
1867 {
1868 //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1869 //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1870         return CkMemCheckPT::replicaAlive;
1871
1872         /*if(CkMemCheckPT::replicaDead==1)
1873                 return 0;
1874         else            
1875                 return 1;*/
1876 }
1877
1878 extern "C"
1879 int CkInCheckpointing()
1880 {
1881         return CkMemCheckPT::inCheckpointing;
1882 }
1883
1884 extern "C"
1885 void CkSetInLdb(){
1886 #if CMK_MEM_CHECKPOINT
1887         CkMemCheckPT::inLoadbalancing = 1;
1888 #endif
1889 }
1890
1891 extern "C"
1892 int CkInLdb(){
1893 #if CMK_MEM_CHECKPOINT
1894         return CkMemCheckPT::inLoadbalancing;
1895 #endif
1896         return 0;
1897 }
1898
1899 extern "C"
1900 void CkResetInLdb(){
1901 #if CMK_MEM_CHECKPOINT
1902         CkMemCheckPT::inLoadbalancing = 0;
1903 #endif
1904 }
1905
1906 /*****************************************************************************
1907                 module initialization
1908 *****************************************************************************/
1909
1910 static int arg_where = CkCheckPoint_inMEM;
1911
1912 #if CMK_MEM_CHECKPOINT
1913 void init_memcheckpt(char **argv)
1914 {
1915     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1916       arg_where = CkCheckPoint_inDISK;
1917     }
1918
1919         // initiliazing _crashedNode variable
1920         CpvInitialize(int, _crashedNode);
1921         CpvInitialize(int, _remoteCrashedNode);
1922         CpvAccess(_crashedNode) = -1;
1923         CpvAccess(_remoteCrashedNode) = -1;
1924         init_FI();
1925
1926 }
1927 #endif
1928
1929 class CkMemCheckPTInit: public Chare {
1930 public:
1931   CkMemCheckPTInit(CkArgMsg *m) {
1932 #if CMK_MEM_CHECKPOINT
1933     if (arg_where == CkCheckPoint_inDISK) {
1934       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1935     }
1936     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1937     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1938 #endif
1939   }
1940 };
1941
1942 static void notifyHandler(char *msg)
1943 {
1944 #if CMK_MEM_CHECKPOINT
1945   CmiFree(msg);
1946       /* immediately increase restart phase to filter old messages */
1947   CpvAccess(_curRestartPhase) ++;
1948   CpvAccess(_qd)->flushStates();
1949   _discard_charm_message();
1950
1951 #endif
1952 }
1953
1954 extern "C"
1955 void notify_crash(int node)
1956 {
1957 #ifdef CMK_MEM_CHECKPOINT
1958   CpvAccess( _crashedNode) = node;
1959 #ifdef CMK_SMP
1960   for(int i=0;i<CkMyNodeSize();i++){
1961         CpvAccessOther(_crashedNode,i)=node;
1962   }
1963 #endif
1964   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1965   CkMemCheckPT::inRestarting = 1;
1966
1967     // this may be in interrupt handler, send a message to reset QD
1968   int pe = CmiNodeFirst(CkMyNode());
1969   for(int i=0;i<CkMyNodeSize();i++){
1970         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1971         CmiSetHandler(msg, notifyHandlerIdx);
1972         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1973   }
1974 #endif
1975 }
1976
1977 extern "C" void (*notify_crash_fn)(int node);
1978
1979
1980 #if CMK_CONVERSE_MPI
1981 void buddyDieHandler(char *msg)
1982 {
1983 #if CMK_MEM_CHECKPOINT
1984    // notify
1985         CkMemCheckPT::inRestarting = 1;
1986    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1987    notify_crash(diepe);
1988    // send message to crash pe to let it restart
1989    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1990    CkPrintf("[%d] finding newrank\n",CkMyPe());
1991    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1992    CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1993    int buddy = obj->BuddyPE(CmiMyPe());
1994    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1995    if (buddy == diepe)  {
1996      mpi_restart_crashed(diepe, newrank);
1997    }
1998 #endif
1999 }
2000
2001 void pingHandler(void *msg)
2002 {
2003   lastPingTime = CmiWallTimer();
2004   CmiFree(msg);
2005 }
2006
2007 void pingCheckHandler()
2008 {
2009 #if CMK_MEM_CHECKPOINT
2010   double now = CmiWallTimer();
2011   if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2012   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2013     int i, pe, buddy;
2014     // tell everyone the buddy dies
2015     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2016     for (i = 1; i < CmiNumPes(); i++) {
2017        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2018        if (obj->BuddyPE(pe) == CmiMyPe()) break;
2019     }
2020     buddy = pe;
2021     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2022     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
2023       if (obj->isFailed(pe) || pe == buddy) continue;
2024       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2025       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2026       CmiSetHandler(msg, buddyDieHandlerIdx);
2027       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2028     }*/
2029     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2030     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2031     CmiSetHandler(msg, buddyDieHandlerIdx);
2032     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2033     //send to everyone in the other world
2034         if(CmiNumPartition()!=1){
2035                 for(int i=0;i<CmiNumPes();i++){
2036                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2037                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2038                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
2039                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2040                 }
2041         }
2042   }
2043   else 
2044     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2045 #endif
2046 }
2047
2048 void pingBuddy()
2049 {
2050 #if CMK_MEM_CHECKPOINT
2051   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2052   if (obj) {
2053     int buddy = obj->BuddyPE(CkMyPe());
2054     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2055     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2056     CmiSetHandler(msg, pingHandlerIdx);
2057     CmiGetRestartPhase(msg) = 9999;
2058     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2059   }
2060   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2061 #endif
2062 }
2063 #endif
2064
2065 // initproc
2066 void CkRegisterRestartHandler( )
2067 {
2068 #if CMK_MEM_CHECKPOINT
2069   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2070   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2071   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2072   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2073   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2074   
2075   //for replica
2076   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2077   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2078   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2079   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2080   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2081   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2082   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2083   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2084
2085 #if CMK_CONVERSE_MPI
2086   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2087   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2088   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2089 #endif
2090
2091   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2092   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2093   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2094   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2095   CpvInitialize(int,curPointer);
2096   CpvInitialize(int,recvdLocal);
2097   CpvInitialize(int,recvdRemote);
2098   CpvInitialize(int,recvdProcChkp);
2099   CpvInitialize(int,recvdArrayChkp);
2100
2101   CpvAccess(procChkptBuf) = NULL;
2102   CpvAccess(buddyBuf) = NULL;
2103   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2104   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2105   CpvAccess(chkpBuf)[0] = NULL;
2106   CpvAccess(chkpBuf)[1] = NULL;
2107   CpvAccess(localProcChkpBuf)[0] = NULL;
2108   CpvAccess(localProcChkpBuf)[1] = NULL;
2109   
2110   CpvAccess(curPointer) = 0;
2111   CpvAccess(recvdLocal) = 0;
2112   CpvAccess(recvdRemote) = 0;
2113   CpvAccess(recvdProcChkp) = 0;
2114   CpvAccess(recvdArrayChkp) = 0;
2115
2116   notify_crash_fn = notify_crash;
2117
2118 #if ! CMK_CONVERSE_MPI
2119   // print pid to kill
2120 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2121 //  sleep(4);
2122 #endif
2123 #endif
2124 }
2125
2126
2127 extern "C"
2128 int CkHasCheckpoints()
2129 {
2130   return checkpointed;
2131 }
2132
2133 /// @todo: the following definitions should be moved to a separate file containing
2134 // structures and functions about fault tolerance strategies
2135
2136 /**
2137  *  * @brief: function for killing a process                                             
2138  *   */
2139 #ifdef CMK_MEM_CHECKPOINT
2140 #if CMK_HAS_GETPID
2141 void killLocal(void *_dummy,double curWallTime){
2142         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2143         if(CmiWallTimer()<killTime-1){
2144                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2145         }else{ 
2146 #if CMK_CONVERSE_MPI
2147                                 CkDieNow();
2148 #else 
2149                 kill(getpid(),SIGKILL);                                               
2150 #endif
2151         }              
2152
2153 #else
2154 void killLocal(void *_dummy,double curWallTime){
2155   CmiAbort("kill() not supported!");
2156 }
2157 #endif
2158 #endif
2159
2160 #ifdef CMK_MEM_CHECKPOINT
2161 /**
2162  * @brief: reads the file with the kill information
2163  */
2164 void readKillFile(){
2165         FILE *fp=fopen(killFile,"r");
2166         if(!fp){
2167                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2168                 return;
2169         }
2170         int proc;
2171         double sec;
2172         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2173                 if(proc == CkMyNode() && CkMyRank() == 0){
2174                         killTime = CmiWallTimer()+sec;
2175                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2176                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2177                 }
2178         }
2179         fclose(fp);
2180 }
2181
2182 #if ! CMK_CONVERSE_MPI
2183 void CkDieNow()
2184 {
2185 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2186          // ignored for non-mpi version
2187         CmiPrintf("[%d] die now.\n", CmiMyPe());
2188         killTime = CmiWallTimer()+0.001;
2189         CcdCallFnAfter(killLocal,NULL,1);
2190 #endif
2191 }
2192 #endif
2193
2194 #endif
2195
2196 #include "CkMemCheckpoint.def.h"
2197
2198