suppot for ampi
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74 CpvDeclare(int, _remoteCrashedNode);
75
76 // static, so that it is accessible from Converse part
77 int CkMemCheckPT::inRestarting = 0;
78 int CkMemCheckPT::inCheckpointing = 0;
79 int CkMemCheckPT::replicaAlive = 1;
80 int CkMemCheckPT::inLoadbalancing = 0;
81 double CkMemCheckPT::startTime;
82 char *CkMemCheckPT::stage;
83 CkCallback CkMemCheckPT::cpCallback;
84
85 int _memChkptOn = 1;                    // checkpoint is on or off
86
87 CkGroupID ckCheckPTGroupID;             // readonly
88
89 static int checkpointed = 0;
90
91 /// @todo the following declarations should be moved into a separate file for all 
92 // fault tolerant strategies
93
94 #ifdef CMK_MEM_CHECKPOINT
95 // name of the kill file that contains processes to be killed 
96 char *killFile;                                               
97 // flag for the kill file         
98 int killFlag=0;
99 // variable for storing the killing time
100 double killTime=0.0;
101 #endif
102
103 #ifdef CKLOCMGR_LOOP
104 #undef CKLOCMGR_LOOP
105 #endif
106 // loop over all CkLocMgr and do "code"
107 #define  CKLOCMGR_LOOP(code)    {       \
108   int numGroups = CkpvAccess(_groupIDTable)->size();    \
109   for(int i=0;i<numGroups;i++) {        \
110     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
111     if(obj->isLocMgr())  {      \
112       CkLocMgr *mgr = (CkLocMgr*)obj;   \
113       code      \
114     }   \
115   }     \
116  }
117
118 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
119 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
120 CpvDeclare(CkCheckPTMessage**, chkpBuf);
121 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
122 //store the checkpoint of the buddy to compare
123 //do not need the whole msg, can be the checksum
124 CpvDeclare(CkCheckPTMessage*, buddyBuf);
125 //pointer of the checkpoint going to be written
126 CpvDeclare(int, curPointer);
127 CpvDeclare(int, recvdRemote);
128 CpvDeclare(int, recvdLocal);
129 CpvDeclare(int, recvdArrayChkp);
130 CpvDeclare(int, recvdProcChkp);
131
132 bool compare(char * buf1, char * buf2);
133 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
134 // Converse function handles
135 static int askPhaseHandlerIdx;
136 static int recvPhaseHandlerIdx;
137 static int askProcDataHandlerIdx;
138 static int restartBcastHandlerIdx;
139 static int recoverProcDataHandlerIdx;
140 static int restartBeginHandlerIdx;
141 static int recvRemoteChkpHandlerIdx;
142 static int replicaDieHandlerIdx;
143 static int replicaDieBcastHandlerIdx;
144 static int replicaRecoverHandlerIdx;
145 static int recoverRemoteProcDataHandlerIdx;
146 static int recoverRemoteArrayDataHandlerIdx;
147 static int notifyHandlerIdx;
148 // compute the backup processor
149 // FIXME: avoid crashed processors
150 #if CMK_CONVERSE_MPI
151 static int pingHandlerIdx;
152 static int pingCheckHandlerIdx;
153 static int buddyDieHandlerIdx;
154 static double lastPingTime = -1;
155
156 extern "C" void mpi_restart_crashed(int pe, int rank);
157 extern "C" int  find_spare_mpirank(int pe,int partition);
158
159 void pingBuddy();
160 void pingCheckHandler();
161
162 #endif
163 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
164
165 inline int CkMemCheckPT::BuddyPE(int pe)
166 {
167   int budpe;
168 #if NODE_CHECKPOINT
169     // buddy is the processor with same rank on the next physical node
170   int r1 = CmiPhysicalRank(pe);
171   int budnode = CmiPhysicalNodeID(pe);
172   do {
173     budnode = (budnode+1)%CmiNumPhysicalNodes();
174     int *pelist;
175     int num;
176     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
177     budpe = pelist[r1 % num];
178   } while (isFailed(budpe));
179   if (budpe == pe) {
180     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
181     CmiAbort("Failed to find a buddy processor");
182   }
183 #else
184   budpe = pe;
185   while (budpe == pe || isFailed(budpe)) 
186           budpe = (budpe+1)%CkNumPes();
187 #endif
188   return budpe;
189 }
190
191 // called in array element constructor
192 // choose and register with 2 buddies for checkpoiting 
193 #if CMK_MEM_CHECKPOINT
194 void ArrayElement::init_checkpt() {
195         if (_memChkptOn == 0) return;
196         if (CkInRestarting()) {
197           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
198         }
199         // only master init checkpoint
200         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
201
202         budPEs[0] = CkMyPe();
203         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
204         CmiAssert(budPEs[0] != budPEs[1]);
205         // inform checkPTMgr
206         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
207         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
208         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
209         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
210 }
211 #endif
212
213 // entry function invoked by checkpoint mgr asking for checkpoint data
214 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
215 #if CMK_MEM_CHECKPOINT
216 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
217 //char index[128];   thisIndexMax.sprint(index);
218 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
219   CkLocMgr *locMgr = thisArray->getLocMgr();
220   CmiAssert(myRec!=NULL);
221   int size;
222   {
223         PUP::sizer p;
224         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
225         size = p.size();
226   }
227   int packSize = size/sizeof(double) +1;
228   CkArrayCheckPTMessage *msg =
229                  new (packSize, 0) CkArrayCheckPTMessage;
230   msg->len = size;
231   msg->index =thisIndexMax;
232   msg->aid = thisArrayID;
233   msg->locMgr = locMgr->getGroupID();
234   msg->cp_flag = 1;
235   {
236         PUP::toMem p(msg->packData);
237         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
238   }
239
240   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
241   checkptMgr.recvData(msg, 2, budPEs);
242   delete m;
243 #endif
244 }
245
246 // checkpoint holder class - for memory checkpointing
247 class CkMemCheckPTInfo: public CkCheckPTInfo
248 {
249   CkArrayCheckPTMessage *ckBuffer;
250 public:
251   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
252                     CkCheckPTInfo(a, loc, idx, pno)
253   {
254     ckBuffer = NULL;
255   }
256   ~CkMemCheckPTInfo() 
257   {
258     if (ckBuffer) delete ckBuffer; 
259   }
260   inline void updateBuffer(CkArrayCheckPTMessage *data) 
261   {
262     CmiAssert(data!=NULL);
263     if (ckBuffer) delete ckBuffer;
264     ckBuffer = data;
265   }    
266   inline CkArrayCheckPTMessage * getCopy()
267   {
268     if (ckBuffer == NULL) {
269       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
270       CmiAbort("Abort!");
271     }
272     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
273   }     
274   inline void updateBuddy(int b1, int b2) {
275      CmiAssert(ckBuffer);
276      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
277      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
278      CmiAssert(pNo != CkMyPe());
279   }
280   inline int getSize() { 
281      CmiAssert(ckBuffer);
282      return ckBuffer->len; 
283   }
284 };
285
286 // checkpoint holder class - for in-disk checkpointing
287 class CkDiskCheckPTInfo: public CkCheckPTInfo 
288 {
289   char *fname;
290   int bud1, bud2;
291   int len;                      // checkpoint size
292 public:
293   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
294   {
295 #if CMK_USE_MKSTEMP
296     fname = new char[64];
297     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
298     mkstemp(fname);
299 #else
300     fname=tmpnam(NULL);
301 #endif
302     bud1 = bud2 = -1;
303     len = 0;
304   }
305   ~CkDiskCheckPTInfo() 
306   {
307     remove(fname);
308   }
309   inline void updateBuffer(CkArrayCheckPTMessage *data) 
310   {
311     double t = CmiWallTimer();
312     // unpack it
313     envelope *env = UsrToEnv(data);
314     CkUnpackMessage(&env);
315     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
316     FILE *f = fopen(fname,"wb");
317     PUP::toDisk p(f);
318     CkPupMessage(p, (void **)&data);
319     // delay sync to the end because otherwise the messages are blocked
320 //    fsync(fileno(f));
321     fclose(f);
322     bud1 = data->bud1;
323     bud2 = data->bud2;
324     len = data->len;
325     delete data;
326     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
327   }
328   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
329   {
330     CkArrayCheckPTMessage *data;
331     FILE *f = fopen(fname,"rb");
332     PUP::fromDisk p(f);
333     CkPupMessage(p, (void **)&data);
334     fclose(f);
335     data->bud1 = bud1;                          // update the buddies
336     data->bud2 = bud2;
337     return data;
338   }
339   inline void updateBuddy(int b1, int b2) {
340      bud1 = b1; bud2 = b2;
341      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
342      CmiAssert(pNo != CkMyPe());
343   }
344   inline int getSize() { 
345      return len; 
346   }
347 };
348
349 CkMemCheckPT::CkMemCheckPT(int w)
350 {
351   int numnodes = 0;
352 #if NODE_CHECKPOINT
353   numnodes = CmiNumPhysicalNodes();
354 #else
355   numnodes = CkNumPes();
356 #endif
357 #if CK_NO_PROC_POOL
358   if (numnodes <= 2)
359 #else
360   if (numnodes  == 1)
361 #endif
362   {
363     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
364     _memChkptOn = 0;
365   }
366   inRestarting = 0;
367   inCheckpointing = 0;
368   recvCount = peCount = 0;
369   ackCount = 0;
370   expectCount = -1;
371   where = w;
372   replicaAlive = 1;
373 #if CMK_CONVERSE_MPI
374   void pingBuddy();
375   void pingCheckHandler();
376   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
377   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
378 #endif
379   chkpTable[0] = NULL;
380   chkpTable[1] = NULL;
381   maxIter = -1;
382   recvIterCount = 0;
383   localDecided = false;
384 }
385
386 CkMemCheckPT::~CkMemCheckPT()
387 {
388   int len = ckTable.length();
389   for (int i=0; i<len; i++) {
390     delete ckTable[i];
391   }
392 }
393
394 void CkMemCheckPT::pup(PUP::er& p) 
395
396   CBase_CkMemCheckPT::pup(p); 
397   p|cpStarter;
398   p|thisFailedPe;
399   p|failedPes;
400   p|ckCheckPTGroupID;           // recover global variable
401   p|cpCallback;                 // store callback
402   p|where;                      // where to checkpoint
403   p|peCount;
404   if (p.isUnpacking()) {
405     recvCount = 0;
406 #if CMK_CONVERSE_MPI
407     void pingBuddy();
408     void pingCheckHandler();
409     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
410     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
411 #endif
412           maxIter = -1;
413           recvIterCount = 0;
414           localDecided = false;
415   }
416 }
417
418 void CkMemCheckPT::getIter(){
419         localDecided = true;
420         localMaxIter = maxIter+1;
421         contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
422         int elemCount = CkCountChkpSyncElements();
423         if(elemCount == 0){
424                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
425         }
426 }
427
428 //when one replica failes, decide the next chkp iter
429
430 void CkMemCheckPT::recvIter(int iter){
431         if(maxIter<iter){
432                 maxIter=iter;
433         }
434 }
435
436 void CkMemCheckPT::recvMaxIter(int iter){
437         localDecided = false;
438         CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
439 }
440
441 void CkMemCheckPT::reachChkpIter(){
442         recvIterCount++;
443         elemCount = CkCountChkpSyncElements();
444         if(recvIterCount == elemCount){
445                 recvIterCount = 0;
446                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
447         }
448 }
449
450 void CkMemCheckPT::startChkp(){
451         CkPrintf("start checkpoint\n");
452         CkStartMemCheckpoint(cpCallback);
453 }
454
455 // called by checkpoint mgr to restore an array element
456 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
457 {
458 #if CMK_MEM_CHECKPOINT
459   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
460   // m->index.print();
461   PUP::fromMem p(m->packData);
462   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
463   CmiAssert(mgr);
464 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
465   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
466 #else
467   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
468 #endif
469
470   // find a list of array elements bound together
471   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
472   CmiAssert(elt);
473   CkLocRec_local *rec = elt->myRec;
474   CkVec<CkMigratable *> list;
475   mgr->migratableList(rec, list);
476   CmiAssert(list.length() > 0);
477   for (int l=0; l<list.length(); l++) {
478     elt = (ArrayElement *)list[l];
479     elt->budPEs[0] = m->bud1;
480     elt->budPEs[1] = m->bud2;
481     //    reset, may not needed now
482     // for now.
483     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
484       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
485       if (c) c->redNo = 0;
486     }
487   }
488 #endif
489 }
490
491 // return 1 if pe is a crashed processor
492 int CkMemCheckPT::isFailed(int pe)
493 {
494   for (int i=0; i<failedPes.length(); i++)
495     if (failedPes[i] == pe) return 1;
496   return 0;
497 }
498
499 // add pe into history list of all failed processors
500 void CkMemCheckPT::failed(int pe)
501 {
502   if (isFailed(pe)) return;
503   failedPes.push_back(pe);
504 }
505
506 int CkMemCheckPT::totalFailed()
507 {
508   return failedPes.length();
509 }
510
511 // create an checkpoint entry for array element of aid with index.
512 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
513 {
514   // error check, no duplicate
515   int idx, len = ckTable.size();
516   for (idx=0; idx<len; idx++) {
517     CkCheckPTInfo *entry = ckTable[idx];
518     if (index == entry->index) {
519       if (loc == entry->locMgr) {
520           // bindTo array elements
521           return;
522       }
523         // for array inheritance, the following check may fail
524         // because ArrayElement constructor of all superclasses are called
525       if (aid == entry->aid) {
526         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
527         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
528       }
529     }
530   }
531   CkCheckPTInfo *newEntry;
532   if (where == CkCheckPoint_inMEM)
533     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
534   else
535     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
536   ckTable.push_back(newEntry);
537   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
538 }
539
540 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
541 {
542 #if !CMK_CHKP_ALL       
543   int buddy = msg->bud1;
544   if (buddy == CkMyPe()) buddy = msg->bud2;
545   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
546   recvData(msg);
547     // ack
548   thisProxy[buddy].gotData();
549 #else
550   chkpTable[0] = NULL;
551   chkpTable[1] = NULL;
552   recvArrayCheckpoint(msg);
553   thisProxy[msg->bud2].gotData();
554 #endif
555 }
556
557 // loop through my checkpoint table and ask checkpointed array elements
558 // to send me checkpoint data.
559 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
560 {
561   checkpointed = 1;
562   cpCallback = cb;
563   cpStarter = starter;
564   inCheckpointing = 1;
565   if (CkMyPe() == cpStarter) {
566     startTime = CmiWallTimer();
567     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
568   }
569 #if CMK_CONVERSE_MPI
570   if(CmiNumPartition()==1)
571 #endif    
572   {
573
574 #if !CMK_CHKP_ALL
575   int len = ckTable.length();
576   for (int i=0; i<len; i++) {
577     CkCheckPTInfo *entry = ckTable[i];
578       // always let the bigger number processor send request
579     //if (CkMyPe() < entry->pNo) continue;
580       // always let the smaller number processor send request, may on same proc
581     if (!isMaster(entry->pNo)) continue;
582       // call inmem_checkpoint to the array element, ask it to send
583       // back checkpoint data via recvData().
584     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
585     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
586   }
587     // if my table is empty, then I am done
588   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
589 #else
590   startArrayCheckpoint();
591 #endif
592         sendProcData();
593   }
594 #if CMK_CONVERSE_MPI
595   else
596   {
597         startCheckpoint();
598   }
599 #endif
600   // pack and send proc level data
601 }
602
603 class MemElementPacker : public CkLocIterator{
604         private:
605                 CkLocMgr *locMgr;
606                 PUP::er &p;
607         public:
608                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
609                 void addLocation(CkLocation &loc){
610                         CkArrayIndexMax idx = loc.getIndex();
611                         CkGroupID gID = locMgr->ckGetGroupID();
612                         p|gID;
613                         p|idx;
614                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
615                 }
616 };
617
618 void pupAllElements(PUP::er &p){
619 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
620         int numElements;
621         if(!p.isUnpacking()){
622                 numElements = CkCountArrayElements();
623         }
624         p | numElements;
625         if(!p.isUnpacking()){
626                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
627         }
628 #endif
629 }
630
631 void CkMemCheckPT::startArrayCheckpoint(){
632 #if CMK_CHKP_ALL
633         int size;
634         {
635                 PUP::sizer psizer;
636                 pupAllElements(psizer);
637                 size = psizer.size();
638         }
639         int packSize = size/sizeof(double)+1;
640  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
641         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
642         msg->len = size;
643         msg->cp_flag = 1;
644         int budPEs[2];
645         msg->bud1=CkMyPe();
646         msg->bud2=ChkptOnPe(CkMyPe());
647         {
648                 PUP::toMem p(msg->packData);
649                 pupAllElements(p);
650         }
651         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
652         if(chkpTable[0]) delete chkpTable[0];
653         chkpTable[0] = msg;
654         //send the checkpoint to my 
655         recvCount++;
656 #endif
657 }
658
659 void CkMemCheckPT::startCheckpoint(){
660 #if CMK_CONVERSE_MPI
661         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
662     CkPrintf("in start checkpointing!!!!\n");
663   int size;
664   {
665     PUP::sizer p;
666     _handleProcData(p,CmiFalse);
667     size = p.size();
668   }
669         CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
670         procMsg->len = size;
671         procMsg->cp_flag = 1;
672         {
673                 PUP::toMem p(procMsg->packData);
674                 _handleProcData(p,CmiFalse);
675         }
676         int pointer = CpvAccess(curPointer);
677         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
678                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
679         
680         {
681                 PUP::sizer psizer;
682                 pupAllElements(psizer);
683                 size = psizer.size();
684         }
685         CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
686         msg->len = size;
687         msg->cp_flag = 1;
688         {
689                 PUP::toMem p(msg->packData);
690                 pupAllElements(p);
691         }
692         pointer = CpvAccess(curPointer);
693         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
694     CkPrintf("start checkpointing!!!!\n");
695         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
696                 CpvAccess(chkpBuf)[pointer] = msg;
697         if(CkReplicaAlive()==1){
698                 CpvAccess(recvdLocal) = 1;
699                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
700                 CkPackMessage(&env);
701                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
702                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
703         }
704         if(CpvAccess(recvdRemote)==1){
705                 //compare the checkpoint 
706           int size = CpvAccess(chkpBuf)[pointer]->len;
707           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
708                 thisProxy[CkMyPe()].doneComparison(true);
709           }else{
710                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
711                 thisProxy[CkMyPe()].doneComparison(false);
712           }
713   }
714         else{
715                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
716                         {       
717                                 int pointer = CpvAccess(curPointer);
718                                 //send the proc data
719                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
720                                 procMsg->pointer = pointer;
721                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
722                                 CkPackMessage(&env);
723                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
724                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
725                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
726                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
727                                 }
728                         }
729                         //send the array checkpoint data
730                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
731                         msg->pointer = CpvAccess(curPointer);
732                         envelope * env = (envelope *)(UsrToEnv(msg));
733                         CkPackMessage(&env);
734                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
735                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
736                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
737                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
738                         //can continue work, no need to wait for my replica
739                 }
740         }
741 #endif
742 }
743
744 void CkMemCheckPT::doneComparison(bool ret){
745         int _ret = 1;
746         if(!ret){
747         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
748                 _ret = 0;
749         }else{
750                 _ret = 1;
751         }
752         inCheckpointing = 0;
753      CkPrintf("[%d][%d] contribute %d \n", CmiMyPartition(),CkMyPe(),_ret);
754         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
755         contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
756 }
757
758 void CkMemCheckPT::doneRComparison(int ret){
759         CpvAccess(recvdRemote) = 0;
760         CpvAccess(recvdLocal) = 0;
761 //      if(CpvAccess(curPointer) == 0){
762         if(ret==CkNumPes()){
763         CpvAccess(curPointer)^=1;
764                 if(CkMyPe() == 0){
765                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
766                 }
767                 CKLOCMGR_LOOP(mgr->resumeFromChkp(););
768         }
769         else{
770         CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
771                 RollBack();
772         }
773 }
774
775 void CkMemCheckPT::RollBack(){
776         //restore group data
777         checkpointed = 0;
778         CkMemCheckPT::inRestarting = 1;
779         int pointer = CpvAccess(curPointer)^1;//use the previous one
780     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
781         PUP::fromMem p(chkpMsg->packData);      
782         
783         //destroy array elements
784         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
785           int numGroups = CkpvAccess(_groupIDTable)->size();
786           for(int i=0;i<numGroups;i++) {
787                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
788                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
789                 obj->flushStates();
790                 obj->ckJustMigrated();
791           }
792         //restore array elements
793         
794         int numElements;
795         p|numElements;
796         
797         if(p.isUnpacking()){
798                 for(int i=0;i<numElements;i++){
799                 //for(int i=0;i<1;i++){
800                         CkGroupID gID;
801                         CkArrayIndex idx;
802                         p|gID;
803                         p|idx;
804                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
805                         CmiAssert(mgr);
806                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
807                 }
808         }
809         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
810         contribute(cb);
811 }
812
813 void CkMemCheckPT::notifyReplicaDie(int pe){
814         //CkPrintf("[%d] receive replica die\n",CkMyPe());
815         replicaAlive = 0;
816         CpvAccess(_remoteCrashedNode) = pe;
817 }
818
819 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
820 {
821 #if CMK_CHKP_ALL
822         int idx = 1;
823         if(msg->bud1 == CkMyPe()){
824                 idx = 0;
825         }
826         int isChkpting = msg->cp_flag;
827         if(isChkpting == 1){
828                 if(chkpTable[idx]) delete chkpTable[idx];
829         }
830         chkpTable[idx] = msg;
831         if(isChkpting){
832                 recvCount++;
833                 if(recvCount == 2){
834                   if (where == CkCheckPoint_inMEM) {
835                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
836                   }
837                   else if (where == CkCheckPoint_inDISK) {
838                         // another barrier for finalize the writing using fsync
839                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
840                         contribute(0,NULL,CkReduction::sum_int,localcb);
841                   }
842                   else
843                         CmiAbort("Unknown checkpoint scheme");
844                   recvCount = 0;
845                 }
846         }
847 #endif
848 }
849
850 // don't handle array elements
851 static inline void _handleProcData(PUP::er &p, CmiBool create)
852 {
853     // save readonlys, and callback BTW
854     CkPupROData(p);
855
856     // save mainchares 
857     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
858         
859 #ifndef CMK_CHARE_USE_PTR
860     // save non-migratable chare
861     CkPupChareData(p);
862 #endif
863
864     // save groups into Groups.dat
865     CkPupGroupData(p,create);
866
867     // save nodegroups into NodeGroups.dat
868     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
869 }
870
871 void CkMemCheckPT::sendProcData()
872 {
873   // find out size of buffer
874   int size;
875   {
876     PUP::sizer p;
877     _handleProcData(p,CmiTrue);
878     size = p.size();
879   }
880   int packSize = size;
881   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
882   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
883   {
884     PUP::toMem p(msg->packData);
885     _handleProcData(p,CmiTrue);
886   }
887   msg->pe = CkMyPe();
888   msg->len = size;
889   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
890   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
891 }
892
893 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
894 {
895   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
896   CpvAccess(procChkptBuf) = msg;
897   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
898   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
899 }
900
901 // ArrayElement call this function to give us the checkpointed data
902 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
903 {
904   int len = ckTable.length();
905   int idx;
906   for (idx=0; idx<len; idx++) {
907     CkCheckPTInfo *entry = ckTable[idx];
908     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
909   }
910   CkAssert(idx < len);
911   int isChkpting = msg->cp_flag;
912   ckTable[idx]->updateBuffer(msg);
913   if (isChkpting) {
914       // all my array elements have returned their inmem data
915       // inform starter processor that I am done.
916     recvCount ++;
917     if (recvCount == ckTable.length()) {
918       if (where == CkCheckPoint_inMEM) {
919         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
920       }
921       else if (where == CkCheckPoint_inDISK) {
922         // another barrier for finalize the writing using fsync
923         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
924         contribute(0,NULL,CkReduction::sum_int,localcb);
925       }
926       else
927         CmiAbort("Unknown checkpoint scheme");
928       recvCount = 0;
929     } 
930   }
931 }
932
933 // only used in disk checkpointing
934 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
935 {
936   delete m;
937 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
938   system("sync");
939 #endif
940   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
941 }
942
943 // only is called on cpStarter when checkpoint is done
944 void CkMemCheckPT::cpFinish()
945 {
946   CmiAssert(CkMyPe() == cpStarter);
947   peCount++;
948     // now that all processors have finished, activate callback
949   if (peCount == 2) 
950 {
951     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
952     peCount = 0;
953     thisProxy.report();
954   }
955 }
956
957 // for debugging, report checkpoint info
958 void CkMemCheckPT::report()
959 {
960         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
961         inCheckpointing = 0;
962 #if !CMK_CHKP_ALL
963   int objsize = 0;
964   int len = ckTable.length();
965   for (int i=0; i<len; i++) {
966     CkCheckPTInfo *entry = ckTable[i];
967     CmiAssert(entry);
968     objsize += entry->getSize();
969   }
970   CmiAssert(CpvAccess(procChkptBuf));
971   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
972 #else
973   if(CkMyPe()==0)
974   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
975 #endif
976 }
977
978 /*****************************************************************************
979                         RESTART Procedure
980 *****************************************************************************/
981
982 // master processor of two buddies
983 inline int CkMemCheckPT::isMaster(int buddype)
984 {
985 #if 0
986   int mype = CkMyPe();
987 //CkPrintf("ismaster: %d %d\n", pe, mype);
988   if (CkNumPes() - totalFailed() == 2) {
989     return mype > buddype;
990   }
991   for (int i=1; i<CkNumPes(); i++) {
992     int me = (buddype+i)%CkNumPes();
993     if (isFailed(me)) continue;
994     if (me == mype) return 1;
995     else return 0;
996   }
997   return 0;
998 #else
999     // smaller one
1000   int mype = CkMyPe();
1001 //CkPrintf("ismaster: %d %d\n", pe, mype);
1002   if (CkNumPes() - totalFailed() == 2) {
1003     return mype < buddype;
1004   }
1005 #if NODE_CHECKPOINT
1006   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1007   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1008 #else
1009   for (int i=1; i<CkNumPes(); i++) {
1010 #endif
1011     int me = (mype+i)%CkNumPes();
1012     if (isFailed(me)) continue;
1013     if (me == buddype) return 1;
1014     else return 0;
1015   }
1016   return 0;
1017 #endif
1018 }
1019
1020
1021
1022 #if 0
1023 // helper class to pup all elements that belong to same ckLocMgr
1024 class ElementDestoryer : public CkLocIterator {
1025 private:
1026         CkLocMgr *locMgr;
1027 public:
1028         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1029         void addLocation(CkLocation &loc) {
1030                 CkArrayIndex idx=loc.getIndex();
1031                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1032                 loc.destroy();
1033         }
1034 };
1035 #endif
1036
1037 // restore the bitmap vector for LB
1038 void CkMemCheckPT::resetLB(int diepe)
1039 {
1040 #if CMK_LBDB_ON
1041   int i;
1042   char *bitmap = new char[CkNumPes()];
1043   // set processor available bitmap
1044   get_avail_vector(bitmap);
1045
1046   for (i=0; i<failedPes.length(); i++)
1047     bitmap[failedPes[i]] = 0; 
1048   bitmap[diepe] = 0;
1049
1050 #if CK_NO_PROC_POOL
1051   set_avail_vector(bitmap);
1052 #endif
1053
1054   // if I am the crashed pe, rebuild my failedPEs array
1055   if (CkMyNode() == diepe)
1056     for (i=0; i<CkNumPes(); i++) 
1057       if (bitmap[i]==0) failed(i);
1058
1059   delete [] bitmap;
1060 #endif
1061 }
1062
1063 static double restartT;
1064
1065 // in case when failedPe dies, everybody go through its checkpoint table:
1066 // destory all array elements
1067 // recover lost buddies
1068 // reconstruct all array elements from check point data
1069 // called on all processors
1070 void CkMemCheckPT::restart(int diePe)
1071 {
1072 #if CMK_MEM_CHECKPOINT
1073   double curTime = CmiWallTimer();
1074   if (CkMyPe() == diePe){
1075            restartT = CmiWallTimer();
1076     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1077   }
1078   stage = (char*)"resetLB";
1079   startTime = curTime;
1080   if (CkMyPe() == diePe)
1081     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1082
1083 #if CK_NO_PROC_POOL
1084   failed(diePe);        // add into the list of failed pes
1085 #endif
1086   thisFailedPe = diePe;
1087
1088   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1089
1090   inRestarting = 1;
1091                                                                                 
1092   // disable load balancer's barrier
1093   //if (CkMyPe() != diePe) resetLB(diePe);
1094
1095   CKLOCMGR_LOOP(mgr->startInserting(););
1096
1097
1098   if(CmiNumPartition()==1){
1099         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1100   }else{
1101         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1102         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1103   }
1104 #endif
1105 }
1106
1107 // loally remove all array elements
1108 void CkMemCheckPT::removeArrayElements()
1109 {
1110 #if CMK_MEM_CHECKPOINT
1111   int len = ckTable.length();
1112   double curTime = CmiWallTimer();
1113   if (CkMyPe() == thisFailedPe) 
1114     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1115   stage = (char*)"removeArrayElements";
1116   startTime = curTime;
1117
1118 //  if (cpCallback.isInvalid()) 
1119 //        CkPrintf("invalid pe %d\n",CkMyPe());
1120 //        CkAbort("Didn't set restart callback\n");;
1121   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1122
1123   // get rid of all buffering and remote recs
1124   // including destorying all array elements
1125 #if CK_NO_PROC_POOL  
1126         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1127 #else
1128         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1129 #endif
1130   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1131 #endif
1132 }
1133
1134 // flush state in reduction manager
1135 void CkMemCheckPT::resetReductionMgr()
1136 {
1137   if (CkMyPe() == thisFailedPe) 
1138     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1139   int numGroups = CkpvAccess(_groupIDTable)->size();
1140   for(int i=0;i<numGroups;i++) {
1141     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1142     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1143     obj->flushStates();
1144     obj->ckJustMigrated();
1145   }
1146   // reset again
1147   //CpvAccess(_qd)->flushStates();
1148   if(CmiNumPartition()==1){
1149         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1150   }
1151   else
1152         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1153 }
1154
1155 // recover the lost buddies
1156 void CkMemCheckPT::recoverBuddies()
1157 {
1158   int idx;
1159   int len = ckTable.length();
1160   // ready to flush reduction manager
1161   // cannot be CkMemCheckPT::restart because destory will modify states
1162   double curTime = CmiWallTimer();
1163   if (CkMyPe() == thisFailedPe)
1164   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1165   stage = (char *)"recoverBuddies";
1166   if (CkMyPe() == thisFailedPe)
1167   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1168   startTime = curTime;
1169
1170   // recover buddies
1171   expectCount = 0;
1172 #if !CMK_CHKP_ALL
1173   for (idx=0; idx<len; idx++) {
1174     CkCheckPTInfo *entry = ckTable[idx];
1175     if (entry->pNo == thisFailedPe) {
1176 #if CK_NO_PROC_POOL
1177       // find a new buddy
1178       int budPe = BuddyPE(CkMyPe());
1179 #else
1180       int budPe = thisFailedPe;
1181 #endif
1182       CkArrayCheckPTMessage *msg = entry->getCopy();
1183       msg->bud1 = budPe;
1184       msg->bud2 = CkMyPe();
1185       msg->cp_flag = 0;            // not checkpointing
1186       thisProxy[budPe].recoverEntry(msg);
1187       expectCount ++;
1188     }
1189   }
1190 #else
1191   //send to failed pe
1192   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1193 #if CK_NO_PROC_POOL
1194       // find a new buddy
1195       int budPe = BuddyPE(CkMyPe());
1196 #else
1197       int budPe = thisFailedPe;
1198 #endif
1199       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1200       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1201           msg->cp_flag = 0;            // not checkpointing
1202       msg->bud1 = budPe;
1203       msg->bud2 = CkMyPe();
1204       thisProxy[budPe].recoverEntry(msg);
1205       expectCount ++;
1206   }
1207 #endif
1208
1209   if (expectCount == 0) {
1210           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1211   }
1212   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1213 }
1214
1215 void CkMemCheckPT::gotData()
1216 {
1217   ackCount ++;
1218   if (ackCount == expectCount) {
1219     ackCount = 0;
1220     expectCount = -1;
1221     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1222     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1223   }
1224 }
1225
1226 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1227 {
1228
1229   for (int i=0; i<n; i++) {
1230     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1231     mgr->updateLocation(idx[i], nowOnPe);
1232   }
1233         thisProxy[nowOnPe].gotReply();
1234 }
1235
1236 // restore array elements
1237 void CkMemCheckPT::recoverArrayElements()
1238 {
1239   double curTime = CmiWallTimer();
1240   int len = ckTable.length();
1241   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1242   stage = (char *)"recoverArrayElements";
1243   if (CkMyPe() == thisFailedPe)
1244   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1245   startTime = curTime;
1246  int flag = 0;
1247   // recover all array elements
1248   int count = 0;
1249
1250 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1251   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1252   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1253 #endif
1254
1255 #if !CMK_CHKP_ALL
1256   for (int idx=0; idx<len; idx++)
1257   {
1258     CkCheckPTInfo *entry = ckTable[idx];
1259 #if CK_NO_PROC_POOL
1260     // the bigger one will do 
1261 //    if (CkMyPe() < entry->pNo) continue;
1262     if (!isMaster(entry->pNo)) continue;
1263 #else
1264     // smaller one do it, which has the original object
1265     if (CkMyPe() == entry->pNo+1 || 
1266         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1267 #endif
1268 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1269
1270     entry->updateBuddy(CkMyPe(), entry->pNo);
1271     CkArrayCheckPTMessage *msg = entry->getCopy();
1272     // gzheng
1273     //thisProxy[CkMyPe()].inmem_restore(msg);
1274     inmem_restore(msg);
1275 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1276     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1277     int homePe = mgr->homePe(msg->index);
1278     if (homePe != CkMyPe()) {
1279       gmap[homePe].push_back(msg->locMgr);
1280       imap[homePe].push_back(msg->index);
1281     }
1282 #endif
1283     CkFreeMsg(msg);
1284     count ++;
1285   }
1286 #else
1287         char * packData;
1288         if(CmiNumPartition()==1){
1289                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1290                 packData = (char *)msg->packData;
1291         }
1292         else{
1293                 int pointer = CpvAccess(curPointer);
1294                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1295                 packData = msg->packData;
1296         }
1297 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1298         recoverAll(packData,gmap,imap);
1299 #else
1300         recoverAll(packData);
1301 #endif
1302 #endif
1303   curTime = CmiWallTimer();
1304   //if (CkMyPe() == thisFailedPe)
1305   if (CkMyPe() == 0)
1306         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1307 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1308   for (int i=0; i<CkNumPes(); i++) {
1309     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1310       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1311         flag++; 
1312         }
1313   }
1314   delete [] imap;
1315   delete [] gmap;
1316 #endif
1317   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1318
1319   CKLOCMGR_LOOP(mgr->doneInserting(););
1320
1321   // _crashedNode = -1;
1322   CpvAccess(_crashedNode) = -1;
1323   inRestarting = 0;
1324 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1325   if (CkMyPe() == 0)
1326     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1327 #else
1328 if(flag == 0)
1329 {
1330     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1331 }
1332 #endif
1333 }
1334
1335 void CkMemCheckPT::gotReply(){
1336     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1337 }
1338
1339 void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1340 #if CMK_CHKP_ALL
1341         PUP::fromMem p(packData);
1342         int numElements;
1343         p|numElements;
1344         if(p.isUnpacking()){
1345                 for(int i=0;i<numElements;i++){
1346                         CkGroupID gID;
1347                         CkArrayIndex idx;
1348                         p|gID;
1349                         p|idx;
1350                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1351                         int homePe = mgr->homePe(idx);
1352 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1353                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1354 #else
1355                         if(CmiNumPartition()==1)
1356                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1357                         else{
1358                                 if(CkMyPe()==thisFailedPe){
1359                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1360                                 }
1361                         else{
1362                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1363                                 }
1364                         }
1365 #endif
1366 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1367                         homePe = mgr->homePe(idx);
1368                         if (homePe != CkMyPe()) {
1369                           gmap[homePe].push_back(gID);
1370                           imap[homePe].push_back(idx);
1371                         }
1372 #endif
1373                 }
1374         }
1375 #endif
1376 }
1377
1378
1379 // on every processor
1380 // turn load balancer back on
1381 void CkMemCheckPT::finishUp()
1382 {
1383   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1384   //CKLOCMGR_LOOP(mgr->doneInserting(););
1385   
1386   if (CkMyPe() == thisFailedPe)
1387   {
1388        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1389        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1390   }
1391   CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1392         
1393 #if CMK_CONVERSE_MPI    
1394   if(CmiNumPartition()!=1){
1395         CpvAccess(recvdProcChkp) = 0;
1396         CpvAccess(recvdArrayChkp) = 0;
1397         CpvAccess(curPointer)^=1;
1398         //notify my replica, restart is done
1399    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1400    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1401    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1402   }
1403    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1404          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1405    }
1406 #endif
1407
1408 #if CK_NO_PROC_POOL
1409 #if NODE_CHECKPOINT
1410   int numnodes = CmiNumPhysicalNodes();
1411 #else
1412   int numnodes = CkNumPes();
1413 #endif
1414   if (numnodes-totalFailed() <=2) {
1415     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1416     _memChkptOn = 0;
1417   }
1418 #endif
1419 }
1420
1421 void CkMemCheckPT::recoverFromSoftFailure()
1422 {
1423         inRestarting = 0;
1424         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1425 }
1426 // called only on 0
1427 void CkMemCheckPT::quiescence(CkCallback &cb)
1428 {
1429   static int pe_count = 0;
1430   pe_count ++;
1431   CmiAssert(CkMyPe() == 0);
1432   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1433   if (pe_count == CkNumPes()) {
1434     pe_count = 0;
1435     cb.send();
1436   }
1437 }
1438
1439 // User callable function - to start a checkpoint
1440 // callback cb is used to pass control back
1441 void CkStartMemCheckpoint(CkCallback &cb)
1442 {
1443 #if CMK_MEM_CHECKPOINT
1444         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1445   if (_memChkptOn == 0) {
1446     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1447     cb.send();
1448     return;
1449   }
1450   if (CkInRestarting()) {
1451       // trying to checkpointing during restart
1452     cb.send();
1453     return;
1454   }
1455     // store user callback and user data
1456   CkMemCheckPT::cpCallback = cb;
1457
1458     // broadcast to start check pointing
1459   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1460   checkptMgr.doItNow(CkMyPe(), cb);
1461 #else
1462   // when mem checkpoint is disabled, invike cb immediately
1463   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1464   cb.send();
1465 #endif
1466 }
1467
1468 void CkRestartCheckPoint(int diePe)
1469 {
1470 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1471   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1472   // broadcast
1473   checkptMgr.restart(diePe);
1474 }
1475
1476 static int _diePE = -1;
1477
1478 // callback function used locally by ccs handler
1479 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1480 {
1481 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1482   CkRestartCheckPoint(_diePE);
1483 }
1484
1485
1486 // called on crashed PE
1487 static void restartBeginHandler(char *msg)
1488 {
1489   CmiFree(msg);
1490 #if CMK_MEM_CHECKPOINT
1491 #if CMK_USE_BARRIER
1492         if(CkMyPe()!=_diePE){
1493                 printf("restar begin on %d\n",CkMyPe());
1494                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1495                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1496                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1497         }else{
1498         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1499         CkRestartCheckPointCallback(NULL, NULL);
1500         }
1501 #else
1502   static int count = 0;
1503   CmiAssert(CkMyPe() == _diePE);
1504   count ++;
1505   if (count == CkNumPes()) {
1506           printf("restart begin on %d\n",CkMyPe());
1507     CkRestartCheckPointCallback(NULL, NULL);
1508     count = 0;
1509   }
1510 #endif
1511 #endif
1512 }
1513
1514 extern void _discard_charm_message();
1515 extern void _resume_charm_message();
1516
1517 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1518         return data;
1519 }
1520
1521 static void restartBcastHandler(char *msg)
1522 {
1523 #if CMK_MEM_CHECKPOINT
1524   // advance phase counter
1525   CkMemCheckPT::inRestarting = 1;
1526   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1527   // gzheng
1528   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1529
1530   if (CkMyPe()==_diePE)
1531     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1532
1533   // reset QD counters
1534 /*  gzheng
1535   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1536 */
1537
1538 /*  gzheng
1539   if (CkMyPe()==_diePE)
1540       CkRestartCheckPointCallback(NULL, NULL);
1541 */
1542   CmiFree(msg);
1543
1544   _resume_charm_message();
1545
1546     // reduction
1547   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1548   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1549 #if CMK_USE_BARRIER
1550         //CmiPrintf("before reduce\n"); 
1551         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1552         //CmiPrintf("after reduce\n");  
1553 #else
1554   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1555 #endif 
1556  checkpointed = 0;
1557 #endif
1558 }
1559
1560 extern void _initDone();
1561
1562 bool compare(char * buf1, char *buf2){
1563         //buf1 my copy, buf2 from another one 
1564 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1565         PUP::checker pchecker(buf1,buf2);
1566         pchecker.skip();
1567         
1568         int numElements;
1569         pchecker|numElements;
1570 //      CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1571         for(int i=0;i<numElements;i++){
1572         //for(int i=0;i<1;i++){
1573                 CkGroupID gID;
1574                 CkArrayIndex idx;
1575                 
1576                 pchecker|gID;
1577                 pchecker|idx;
1578                 
1579 //              CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1580                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1581                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1582 //              CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1583         }
1584         if(pchecker.getResult()){
1585                 CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
1586         }else{
1587                 CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
1588         }
1589         return pchecker.getResult();
1590 }
1591
1592 static void recvRemoteChkpHandler(char *msg){
1593    envelope *env = (envelope *)msg;
1594    CkUnpackMessage(&env);
1595    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1596   if(CpvAccess(recvdLocal)==1){
1597           int pointer = CpvAccess(curPointer);
1598           int size = CpvAccess(chkpBuf)[pointer]->len;
1599           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1600           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1601                         checkptMgr[CkMyPe()].doneComparison(true);
1602           }else
1603           {
1604                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1605                         checkptMgr[CkMyPe()].doneComparison(false);
1606           }
1607   }else{
1608           CpvAccess(recvdRemote) = 1;
1609           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1610           CpvAccess(buddyBuf) = chkpMsg;
1611   }  
1612 }
1613
1614 static void replicaRecoverHandler(char *msg){
1615         CpvAccess(_remoteCrashedNode) = -1;
1616         CkMemCheckPT::replicaAlive = 1;
1617     bool ret = true;
1618     CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1619 }
1620
1621 static void replicaDieHandler(char * msg){
1622 #if CMK_CONVERSE_MPI    
1623         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1624         CpvAccess(_remoteCrashedNode) = diePe;
1625         CkMemCheckPT::replicaAlive = 0;
1626         find_spare_mpirank(diePe,CmiMyPartition()^1);
1627     if(CkMyPe()==diePe){
1628       CkPrintf("pe %d in replicad word die\n",diePe);
1629             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1630     }
1631 #endif
1632         //broadcast to my partition to get local max iter
1633     if(CkMyPe()==diePe){
1634                 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1635                 checkptMgr.getIter();
1636         }
1637 }
1638
1639
1640 static void replicaDieBcastHandler(char *msg){
1641         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1642         CpvAccess(_remoteCrashedNode) = diePe;
1643         CkMemCheckPT::replicaAlive = 0;
1644 }
1645
1646 static void recoverRemoteProcDataHandler(char *msg){
1647    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1648    envelope *env = (envelope *)msg;
1649    CkUnpackMessage(&env);
1650    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1651         
1652    //store the checkpoint
1653         int pointer = procMsg->pointer;
1654
1655
1656    if(CkMyPe()==CpvAccess(_crashedNode)){
1657            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1658            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1659            PUP::fromMem p(procMsg->packData);
1660            _handleProcData(p,CmiTrue);
1661            _initDone();
1662    }
1663    else{
1664            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1665            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1666            //_handleProcData(p,CmiFalse);
1667    }
1668    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1669    CKLOCMGR_LOOP(mgr->startInserting(););
1670    
1671    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1672    CpvAccess(recvdProcChkp) =1;
1673         if(CpvAccess(recvdArrayChkp)==1){
1674                 _resume_charm_message();
1675                 _diePE = CpvAccess(_crashedNode);
1676                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1677                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1678                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1679         }
1680 }
1681
1682 static void recoverRemoteArrayDataHandler(char *msg){
1683   if(CkMyPe()==CpvAccess(_crashedNode)) 
1684   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1685    envelope *env = (envelope *)msg;
1686    CkUnpackMessage(&env);
1687    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1688         
1689    //store the checkpoint
1690         int pointer = chkpMsg->pointer;
1691         CpvAccess(curPointer) = pointer;
1692         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1693         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1694    CpvAccess(recvdArrayChkp) =1;
1695         CkMemCheckPT::inRestarting = 1;
1696         if(CpvAccess(recvdProcChkp) == 1){
1697                 _resume_charm_message();
1698                 _diePE = CpvAccess(_crashedNode);
1699                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1700                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1701                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1702                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1703         }
1704 }
1705
1706 static void recvPhaseHandler(char * msg)
1707 {
1708         CpvAccess(_curRestartPhase)--;
1709         CkMemCheckPT::inRestarting = 1;
1710         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1711   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1712   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1713   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1714 //              CmiPrintf("start ping check handler\n");
1715 //       }
1716         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1717   // }
1718    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1719    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1720
1721 }
1722 // called on crashed processor
1723 static void recoverProcDataHandler(char *msg)
1724 {
1725 #if CMK_MEM_CHECKPOINT
1726    int i;
1727    envelope *env = (envelope *)msg;
1728    CkUnpackMessage(&env);
1729    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1730    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1731    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1732    PUP::fromMem p(procMsg->packData);
1733    _handleProcData(p,CmiTrue);
1734
1735    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1736    // gzheng
1737    CKLOCMGR_LOOP(mgr->startInserting(););
1738
1739    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1740    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1741    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1742    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1743
1744    _initDone();
1745 //   CpvAccess(_qd)->flushStates();
1746    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1747 #endif
1748 }
1749 //for replica, got the phase number from my neighbor processor in the current partition
1750 static void askPhaseHandler(char *msg)
1751 {
1752 #if CMK_MEM_CHECKPOINT
1753         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1754     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1755         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1756     CmiSetHandler(msg, recvPhaseHandlerIdx);
1757         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1758 #endif
1759 }
1760 // called on its backup processor
1761 // get backup message buffer and sent to crashed processor
1762 static void askProcDataHandler(char *msg)
1763 {
1764 #if CMK_MEM_CHECKPOINT
1765     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1766     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1767     if (CpvAccess(procChkptBuf) == NULL)  {
1768       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1769       CkAbort("no checkpoint found");
1770     }
1771     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1772     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1773
1774     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1775
1776     CkPackMessage(&env);
1777     CmiSetHandler(env, recoverProcDataHandlerIdx);
1778     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1779     CpvAccess(procChkptBuf) = NULL;
1780     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1781 #endif
1782 }
1783
1784 // called on PE 0
1785 void qd_callback(void *m)
1786 {
1787    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1788    CkFreeMsg(m);
1789    if(CmiNumPartition()==1){
1790 #ifdef CMK_SMP
1791            for(int i=0;i<CmiMyNodeSize();i++){
1792            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1793            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1794                 CmiSetHandler(msg, askProcDataHandlerIdx);
1795                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1796                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1797            }
1798            return;
1799 #endif
1800            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1801            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1802            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1803            CmiSetHandler(msg, askProcDataHandlerIdx);
1804            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1805            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1806         }
1807    else{
1808                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1809                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1810                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1811    }
1812 }
1813
1814 // on crashed node
1815 void CkMemRestart(const char *dummy, CkArgMsg *args)
1816 {
1817 #if CMK_MEM_CHECKPOINT
1818    _diePE = CmiMyNode();
1819    CpvAccess( _crashedNode )= CmiMyNode();
1820    CkMemCheckPT::inRestarting = 1;
1821    _discard_charm_message();
1822   
1823   /*if(CmiMyRank()==0){
1824     CkCallback cb(qd_callback);
1825     CkStartQD(cb);
1826     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1827   }*/
1828    if(CmiNumPartition()==1){
1829            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1830                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1831                 restartT = CmiWallTimer();
1832            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1833            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1834            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1835            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1836            CmiSetHandler(msg, askProcDataHandlerIdx);
1837            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1838            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1839    }
1840    else{
1841                 CkCallback cb(qd_callback);
1842                 CkStartQD(cb);
1843    }
1844 #else
1845    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1846 #endif
1847 }
1848
1849 // can be called in other files
1850 // return true if it is in restarting
1851 extern "C"
1852 int CkInRestarting()
1853 {
1854 #if CMK_MEM_CHECKPOINT
1855   if (CpvAccess( _crashedNode)!=-1) return 1;
1856   // gzheng
1857   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1858   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1859   return CkMemCheckPT::inRestarting;
1860 #else
1861   return 0;
1862 #endif
1863 }
1864
1865 extern "C"
1866 int CkReplicaAlive()
1867 {
1868 //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1869 //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1870         return CkMemCheckPT::replicaAlive;
1871
1872         /*if(CkMemCheckPT::replicaDead==1)
1873                 return 0;
1874         else            
1875                 return 1;*/
1876 }
1877
1878 extern "C"
1879 int CkInCheckpointing()
1880 {
1881         return CkMemCheckPT::inCheckpointing;
1882 }
1883
1884 extern "C"
1885 void CkSetInLdb(){
1886 #if CMK_MEM_CHECKPOINT
1887         CkMemCheckPT::inLoadbalancing = 1;
1888 #endif
1889 }
1890
1891 extern "C"
1892 int CkInLdb(){
1893 #if CMK_MEM_CHECKPOINT
1894         return CkMemCheckPT::inLoadbalancing;
1895 #endif
1896         return 0;
1897 }
1898
1899 extern "C"
1900 void CkResetInLdb(){
1901 #if CMK_MEM_CHECKPOINT
1902         CkMemCheckPT::inLoadbalancing = 0;
1903 #endif
1904 }
1905
1906 /*****************************************************************************
1907                 module initialization
1908 *****************************************************************************/
1909
1910 static int arg_where = CkCheckPoint_inMEM;
1911
1912 #if CMK_MEM_CHECKPOINT
1913 void init_memcheckpt(char **argv)
1914 {
1915     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1916       arg_where = CkCheckPoint_inDISK;
1917     }
1918
1919         // initiliazing _crashedNode variable
1920         CpvInitialize(int, _crashedNode);
1921         CpvInitialize(int, _remoteCrashedNode);
1922         CpvAccess(_crashedNode) = -1;
1923         CpvAccess(_remoteCrashedNode) = -1;
1924 }
1925 #endif
1926
1927 class CkMemCheckPTInit: public Chare {
1928 public:
1929   CkMemCheckPTInit(CkArgMsg *m) {
1930 #if CMK_MEM_CHECKPOINT
1931     if (arg_where == CkCheckPoint_inDISK) {
1932       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1933     }
1934     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1935     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1936 #endif
1937   }
1938 };
1939
1940 static void notifyHandler(char *msg)
1941 {
1942 #if CMK_MEM_CHECKPOINT
1943   CmiFree(msg);
1944       /* immediately increase restart phase to filter old messages */
1945   CpvAccess(_curRestartPhase) ++;
1946   CpvAccess(_qd)->flushStates();
1947   _discard_charm_message();
1948
1949 #endif
1950 }
1951
1952 extern "C"
1953 void notify_crash(int node)
1954 {
1955 #ifdef CMK_MEM_CHECKPOINT
1956   CpvAccess( _crashedNode) = node;
1957 #ifdef CMK_SMP
1958   for(int i=0;i<CkMyNodeSize();i++){
1959         CpvAccessOther(_crashedNode,i)=node;
1960   }
1961 #endif
1962   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1963   CkMemCheckPT::inRestarting = 1;
1964
1965     // this may be in interrupt handler, send a message to reset QD
1966   int pe = CmiNodeFirst(CkMyNode());
1967   for(int i=0;i<CkMyNodeSize();i++){
1968         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1969         CmiSetHandler(msg, notifyHandlerIdx);
1970         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1971   }
1972 #endif
1973 }
1974
1975 extern "C" void (*notify_crash_fn)(int node);
1976
1977
1978 #if CMK_CONVERSE_MPI
1979 void buddyDieHandler(char *msg)
1980 {
1981 #if CMK_MEM_CHECKPOINT
1982    // notify
1983         CkMemCheckPT::inRestarting = 1;
1984    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1985    notify_crash(diepe);
1986    // send message to crash pe to let it restart
1987    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1988    CkPrintf("[%d] finding newrank\n",CkMyPe());
1989    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1990    CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1991    int buddy = obj->BuddyPE(CmiMyPe());
1992    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1993    if (buddy == diepe)  {
1994      mpi_restart_crashed(diepe, newrank);
1995    }
1996 #endif
1997 }
1998
1999 void pingHandler(void *msg)
2000 {
2001   lastPingTime = CmiWallTimer();
2002   CmiFree(msg);
2003 }
2004
2005 void pingCheckHandler()
2006 {
2007 #if CMK_MEM_CHECKPOINT
2008   double now = CmiWallTimer();
2009   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2010   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2011     int i, pe, buddy;
2012     // tell everyone the buddy dies
2013     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2014     for (i = 1; i < CmiNumPes(); i++) {
2015        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2016        if (obj->BuddyPE(pe) == CmiMyPe()) break;
2017     }
2018     buddy = pe;
2019     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2020     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
2021       if (obj->isFailed(pe) || pe == buddy) continue;
2022       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2023       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2024       CmiSetHandler(msg, buddyDieHandlerIdx);
2025       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2026     }*/
2027     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2028     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2029     CmiSetHandler(msg, buddyDieHandlerIdx);
2030     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2031     //send to everyone in the other world
2032         if(CmiNumPartition()!=1){
2033                 for(int i=0;i<CmiNumPes();i++){
2034                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2035                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2036                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
2037                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2038                 }
2039         }
2040   }
2041   else 
2042     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2043 #endif
2044 }
2045
2046 void pingBuddy()
2047 {
2048 #if CMK_MEM_CHECKPOINT
2049   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2050   if (obj) {
2051     int buddy = obj->BuddyPE(CkMyPe());
2052     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2053     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2054     CmiSetHandler(msg, pingHandlerIdx);
2055     CmiGetRestartPhase(msg) = 9999;
2056     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2057   }
2058   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2059 #endif
2060 }
2061 #endif
2062
2063 // initproc
2064 void CkRegisterRestartHandler( )
2065 {
2066 #if CMK_MEM_CHECKPOINT
2067   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2068   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2069   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2070   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2071   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2072   
2073   //for replica
2074   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2075   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2076   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2077   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2078   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2079   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2080   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2081   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2082
2083 #if CMK_CONVERSE_MPI
2084   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2085   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2086   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2087 #endif
2088
2089   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2090   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2091   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2092   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2093   CpvInitialize(int,curPointer);
2094   CpvInitialize(int,recvdLocal);
2095   CpvInitialize(int,recvdRemote);
2096   CpvInitialize(int,recvdProcChkp);
2097   CpvInitialize(int,recvdArrayChkp);
2098
2099   CpvAccess(procChkptBuf) = NULL;
2100   CpvAccess(buddyBuf) = NULL;
2101   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2102   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2103   CpvAccess(chkpBuf)[0] = NULL;
2104   CpvAccess(chkpBuf)[1] = NULL;
2105   CpvAccess(localProcChkpBuf)[0] = NULL;
2106   CpvAccess(localProcChkpBuf)[1] = NULL;
2107   
2108   CpvAccess(curPointer) = 0;
2109   CpvAccess(recvdLocal) = 0;
2110   CpvAccess(recvdRemote) = 0;
2111   CpvAccess(recvdProcChkp) = 0;
2112   CpvAccess(recvdArrayChkp) = 0;
2113
2114   notify_crash_fn = notify_crash;
2115
2116 #if ! CMK_CONVERSE_MPI
2117   // print pid to kill
2118 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2119 //  sleep(4);
2120 #endif
2121 #endif
2122 }
2123
2124
2125 extern "C"
2126 int CkHasCheckpoints()
2127 {
2128   return checkpointed;
2129 }
2130
2131 /// @todo: the following definitions should be moved to a separate file containing
2132 // structures and functions about fault tolerance strategies
2133
2134 /**
2135  *  * @brief: function for killing a process                                             
2136  *   */
2137 #ifdef CMK_MEM_CHECKPOINT
2138 #if CMK_HAS_GETPID
2139 void killLocal(void *_dummy,double curWallTime){
2140         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2141         if(CmiWallTimer()<killTime-1){
2142                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2143         }else{ 
2144 #if CMK_CONVERSE_MPI
2145                                 CkDieNow();
2146 #else 
2147                 kill(getpid(),SIGKILL);                                               
2148 #endif
2149         }              
2150
2151 #else
2152 void killLocal(void *_dummy,double curWallTime){
2153   CmiAbort("kill() not supported!");
2154 }
2155 #endif
2156 #endif
2157
2158 #ifdef CMK_MEM_CHECKPOINT
2159 /**
2160  * @brief: reads the file with the kill information
2161  */
2162 void readKillFile(){
2163         FILE *fp=fopen(killFile,"r");
2164         if(!fp){
2165                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2166                 return;
2167         }
2168         int proc;
2169         double sec;
2170         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2171                 if(proc == CkMyNode() && CkMyRank() == 0){
2172                         killTime = CmiWallTimer()+sec;
2173                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2174                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2175                 }
2176         }
2177         fclose(fp);
2178 }
2179
2180 #if ! CMK_CONVERSE_MPI
2181 void CkDieNow()
2182 {
2183 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2184          // ignored for non-mpi version
2185         CmiPrintf("[%d] die now.\n", CmiMyPe());
2186         killTime = CmiWallTimer()+0.001;
2187         CcdCallFnAfter(killLocal,NULL,1);
2188 #endif
2189 }
2190 #endif
2191
2192 #endif
2193
2194 #include "CkMemCheckpoint.def.h"
2195
2196