Merge branch 'replica_ft' of charmgit:charm into replica_ft
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117 }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, localChkpDone);
131 CpvDeclare(int, remoteChkpDone);
132 CpvDeclare(int, recvdArrayChkp);
133 CpvDeclare(int, recvdProcChkp);
134
135 bool compare(char * buf1, char * buf2);
136 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
137 // Converse function handles
138 static int askPhaseHandlerIdx;
139 static int recvPhaseHandlerIdx;
140 static int askProcDataHandlerIdx;
141 static int restartBcastHandlerIdx;
142 static int recoverProcDataHandlerIdx;
143 static int restartBeginHandlerIdx;
144 static int recvRemoteChkpHandlerIdx;
145 static int replicaDieHandlerIdx;
146 static int replicaDieBcastHandlerIdx;
147 static int replicaRecoverHandlerIdx;
148 static int replicaChkpDoneHandlerIdx;
149 static int recoverRemoteProcDataHandlerIdx;
150 static int recoverRemoteArrayDataHandlerIdx;
151 static int notifyHandlerIdx;
152 // compute the backup processor
153 // FIXME: avoid crashed processors
154 #if CMK_CONVERSE_MPI
155 static int pingHandlerIdx;
156 static int pingCheckHandlerIdx;
157 static int buddyDieHandlerIdx;
158 static double lastPingTime = -1;
159
160 extern "C" void mpi_restart_crashed(int pe, int rank);
161 extern "C" int  find_spare_mpirank(int pe,int partition);
162
163 void pingBuddy();
164 void pingCheckHandler();
165
166 #endif
167 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
168
169 inline int CkMemCheckPT::BuddyPE(int pe)
170 {
171   int budpe;
172 #if NODE_CHECKPOINT
173   // buddy is the processor with same rank on the next physical node
174   int r1 = CmiPhysicalRank(pe);
175   int budnode = CmiPhysicalNodeID(pe);
176   do {
177     budnode = (budnode+1)%CmiNumPhysicalNodes();
178     int *pelist;
179     int num;
180     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
181     budpe = pelist[r1 % num];
182   } while (isFailed(budpe));
183   if (budpe == pe) {
184     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
185     CmiAbort("Failed to find a buddy processor");
186   }
187 #else
188   budpe = pe;
189   budpe = (budpe+1)%CkNumPes();
190 #endif
191   return budpe;
192 }
193
194 // called in array element constructor
195 // choose and register with 2 buddies for checkpoiting 
196 #if CMK_MEM_CHECKPOINT
197 void ArrayElement::init_checkpt() {
198   if (_memChkptOn == 0) return;
199   if (CkInRestarting()) {
200     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
201   }
202   // only master init checkpoint
203   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
204
205   budPEs[0] = CkMyPe();
206   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
207   CmiAssert(budPEs[0] != budPEs[1]);
208   // inform checkPTMgr
209   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
210   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
211   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
212   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
213 }
214 #endif
215
216 // entry function invoked by checkpoint mgr asking for checkpoint data
217 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
218 #if CMK_MEM_CHECKPOINT
219   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
220   //char index[128];   thisIndexMax.sprint(index);
221   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
222   CkLocMgr *locMgr = thisArray->getLocMgr();
223   CmiAssert(myRec!=NULL);
224   int size;
225   {
226     PUP::sizer p;
227     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
228     size = p.size();
229   }
230   int packSize = size/sizeof(double) +1;
231   CkArrayCheckPTMessage *msg =
232     new (packSize, 0) CkArrayCheckPTMessage;
233   msg->len = size;
234   msg->index =thisIndexMax;
235   msg->aid = thisArrayID;
236   msg->locMgr = locMgr->getGroupID();
237   msg->cp_flag = 1;
238   {
239     PUP::toMem p(msg->packData);
240     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
241   }
242
243   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
244   checkptMgr.recvData(msg, 2, budPEs);
245   delete m;
246 #endif
247 }
248
249 // checkpoint holder class - for memory checkpointing
250 class CkMemCheckPTInfo: public CkCheckPTInfo
251 {
252   CkArrayCheckPTMessage *ckBuffer;
253   public:
254   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
255     CkCheckPTInfo(a, loc, idx, pno)
256   {
257     ckBuffer = NULL;
258   }
259   ~CkMemCheckPTInfo() 
260   {
261     if (ckBuffer) delete ckBuffer; 
262   }
263   inline void updateBuffer(CkArrayCheckPTMessage *data) 
264   {
265     CmiAssert(data!=NULL);
266     if (ckBuffer) delete ckBuffer;
267     ckBuffer = data;
268   }    
269   inline CkArrayCheckPTMessage * getCopy()
270   {
271     if (ckBuffer == NULL) {
272       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
273       CmiAbort("Abort!");
274     }
275     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
276   }     
277   inline void updateBuddy(int b1, int b2) {
278     CmiAssert(ckBuffer);
279     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
280     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
281     CmiAssert(pNo != CkMyPe());
282   }
283   inline int getSize() { 
284     CmiAssert(ckBuffer);
285     return ckBuffer->len; 
286   }
287 };
288
289 // checkpoint holder class - for in-disk checkpointing
290 class CkDiskCheckPTInfo: public CkCheckPTInfo 
291 {
292   char *fname;
293   int bud1, bud2;
294   int len;                      // checkpoint size
295   public:
296   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
297   {
298 #if CMK_USE_MKSTEMP
299     fname = new char[64];
300     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
301     mkstemp(fname);
302 #else
303     fname=tmpnam(NULL);
304 #endif
305     bud1 = bud2 = -1;
306     len = 0;
307   }
308   ~CkDiskCheckPTInfo() 
309   {
310     remove(fname);
311   }
312   inline void updateBuffer(CkArrayCheckPTMessage *data) 
313   {
314     double t = CmiWallTimer();
315     // unpack it
316     envelope *env = UsrToEnv(data);
317     CkUnpackMessage(&env);
318     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
319     FILE *f = fopen(fname,"wb");
320     PUP::toDisk p(f);
321     CkPupMessage(p, (void **)&data);
322     // delay sync to the end because otherwise the messages are blocked
323     //    fsync(fileno(f));
324     fclose(f);
325     bud1 = data->bud1;
326     bud2 = data->bud2;
327     len = data->len;
328     delete data;
329     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
330   }
331   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
332   {
333     CkArrayCheckPTMessage *data;
334     FILE *f = fopen(fname,"rb");
335     PUP::fromDisk p(f);
336     CkPupMessage(p, (void **)&data);
337     fclose(f);
338     data->bud1 = bud1;                          // update the buddies
339     data->bud2 = bud2;
340     return data;
341   }
342   inline void updateBuddy(int b1, int b2) {
343     bud1 = b1; bud2 = b2;
344     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
345     CmiAssert(pNo != CkMyPe());
346   }
347   inline int getSize() { 
348     return len; 
349   }
350 };
351
352 CkMemCheckPT::CkMemCheckPT(int w)
353 {
354   int numnodes = 0;
355 #if NODE_CHECKPOINT
356   numnodes = CmiNumPhysicalNodes();
357 #else
358   numnodes = CkNumPes();
359 #endif
360 #if CK_NO_PROC_POOL
361   if (numnodes <= 2)
362 #else
363     if (numnodes  == 1)
364 #endif
365     {
366       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
367       _memChkptOn = 0;
368     }
369   inRestarting = 0;
370   inCheckpointing = 0;
371   recvCount = peCount = 0;
372   ackCount = 0;
373   expectCount = -1;
374   where = w;
375   replicaAlive = 1;
376 #if CMK_CONVERSE_MPI
377   void pingBuddy();
378   void pingCheckHandler();
379   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
380   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
381 #endif
382   chkpTable[0] = NULL;
383   chkpTable[1] = NULL;
384   maxIter = -1;
385   recvIterCount = 0;
386   localDecided = false;
387 }
388
389 CkMemCheckPT::~CkMemCheckPT()
390 {
391   int len = ckTable.length();
392   for (int i=0; i<len; i++) {
393     delete ckTable[i];
394   }
395 }
396
397 void CkMemCheckPT::pup(PUP::er& p) 
398
399   CBase_CkMemCheckPT::pup(p); 
400   p|cpStarter;
401   p|thisFailedPe;
402   p|failedPes;
403   p|ckCheckPTGroupID;           // recover global variable
404   p|cpCallback;                 // store callback
405   p|where;                      // where to checkpoint
406   p|peCount;
407   if (p.isUnpacking()) {
408     recvCount = 0;
409 #if CMK_CONVERSE_MPI
410     void pingBuddy();
411     void pingCheckHandler();
412     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
413     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
414 #endif
415     maxIter = -1;
416     recvIterCount = 0;
417     localDecided = false;
418   }
419 }
420
421 void CkMemCheckPT::getIter(){
422   localDecided = true;
423   localMaxIter = maxIter+1;
424   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
425   int elemCount = CkCountChkpSyncElements();
426   if(elemCount == 0){
427     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
428   }
429 }
430
431 //when one replica failes, decide the next chkp iter
432
433 void CkMemCheckPT::recvIter(int iter){
434   if(maxIter<iter){
435     maxIter=iter;
436   }
437 }
438
439 void CkMemCheckPT::recvMaxIter(int iter){
440   localDecided = false;
441   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
442 }
443
444 void CkMemCheckPT::reachChkpIter(){
445   recvIterCount++;
446   elemCount = CkCountChkpSyncElements();
447   if(recvIterCount == elemCount){
448     recvIterCount = 0;
449     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
450   }
451 }
452
453 void CkMemCheckPT::startChkp(){
454   CkPrintf("start checkpoint\n");
455   CkStartMemCheckpoint(cpCallback);
456 }
457
458 // called by checkpoint mgr to restore an array element
459 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
460 {
461 #if CMK_MEM_CHECKPOINT
462   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
463   // m->index.print();
464   PUP::fromMem p(m->packData);
465   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
466   CmiAssert(mgr);
467 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
468   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
469 #else
470   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
471 #endif
472
473   // find a list of array elements bound together
474   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
475   CmiAssert(elt);
476   CkLocRec_local *rec = elt->myRec;
477   CkVec<CkMigratable *> list;
478   mgr->migratableList(rec, list);
479   CmiAssert(list.length() > 0);
480   for (int l=0; l<list.length(); l++) {
481     elt = (ArrayElement *)list[l];
482     elt->budPEs[0] = m->bud1;
483     elt->budPEs[1] = m->bud2;
484     //    reset, may not needed now
485     // for now.
486     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
487       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
488       if (c) c->redNo = 0;
489     }
490   }
491 #endif
492 }
493
494 // return 1 if pe is a crashed processor
495 int CkMemCheckPT::isFailed(int pe)
496 {
497   for (int i=0; i<failedPes.length(); i++)
498     if (failedPes[i] == pe) return 1;
499   return 0;
500 }
501
502 // add pe into history list of all failed processors
503 void CkMemCheckPT::failed(int pe)
504 {
505   if (isFailed(pe)) return;
506   failedPes.push_back(pe);
507 }
508
509 int CkMemCheckPT::totalFailed()
510 {
511   return failedPes.length();
512 }
513
514 // create an checkpoint entry for array element of aid with index.
515 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
516 {
517   // error check, no duplicate
518   int idx, len = ckTable.size();
519   for (idx=0; idx<len; idx++) {
520     CkCheckPTInfo *entry = ckTable[idx];
521     if (index == entry->index) {
522       if (loc == entry->locMgr) {
523         // bindTo array elements
524         return;
525       }
526       // for array inheritance, the following check may fail
527       // because ArrayElement constructor of all superclasses are called
528       if (aid == entry->aid) {
529         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
530         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
531       }
532     }
533   }
534   CkCheckPTInfo *newEntry;
535   if (where == CkCheckPoint_inMEM)
536     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
537   else
538     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
539   ckTable.push_back(newEntry);
540   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
541 }
542
543 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
544 {
545 #if !CMK_CHKP_ALL       
546   int buddy = msg->bud1;
547   if (buddy == CkMyPe()) buddy = msg->bud2;
548   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
549   recvData(msg);
550   // ack
551   thisProxy[buddy].gotData();
552 #else
553   chkpTable[0] = NULL;
554   chkpTable[1] = NULL;
555   recvArrayCheckpoint(msg);
556   thisProxy[msg->bud2].gotData();
557 #endif
558 }
559
560 // loop through my checkpoint table and ask checkpointed array elements
561 // to send me checkpoint data.
562 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
563 {
564   checkpointed = 1;
565   cpCallback = cb;
566   cpStarter = starter;
567   inCheckpointing = 1;
568   if (CkMyPe() == cpStarter) {
569     startTime = CmiWallTimer();
570     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
571   }
572 #if CMK_CONVERSE_MPI
573   if(CmiNumPartition()==1)
574 #endif    
575   {
576
577 #if !CMK_CHKP_ALL
578     int len = ckTable.length();
579     for (int i=0; i<len; i++) {
580       CkCheckPTInfo *entry = ckTable[i];
581       // always let the bigger number processor send request
582       //if (CkMyPe() < entry->pNo) continue;
583       // always let the smaller number processor send request, may on same proc
584       if (!isMaster(entry->pNo)) continue;
585       // call inmem_checkpoint to the array element, ask it to send
586       // back checkpoint data via recvData().
587       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
588       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
589     }
590     // if my table is empty, then I am done
591     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
592 #else
593     startArrayCheckpoint();
594 #endif
595     sendProcData();
596   }
597 #if CMK_CONVERSE_MPI
598   else
599   {
600     startCheckpoint();
601   }
602 #endif
603   // pack and send proc level data
604 }
605
606 class MemElementPacker : public CkLocIterator{
607   private:
608     CkLocMgr *locMgr;
609     PUP::er &p;
610   public:
611     MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
612     void addLocation(CkLocation &loc){
613       CkArrayIndexMax idx = loc.getIndex();
614       CkGroupID gID = locMgr->ckGetGroupID();
615       p|gID;
616       p|idx;
617       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
618     }
619 };
620
621 void pupAllElements(PUP::er &p){
622 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
623   int numElements;
624   if(!p.isUnpacking()){
625     numElements = CkCountArrayElements();
626   }
627   p | numElements;
628   if(!p.isUnpacking()){
629     CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
630   }
631 #endif
632 }
633
634 void CkMemCheckPT::startArrayCheckpoint(){
635 #if CMK_CHKP_ALL
636   int size;
637   {
638     PUP::sizer psizer;
639     pupAllElements(psizer);
640     size = psizer.size();
641   }
642   int packSize = size/sizeof(double)+1;
643   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
644   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
645   msg->len = size;
646   msg->cp_flag = 1;
647   int budPEs[2];
648   msg->bud1=CkMyPe();
649   msg->bud2=ChkptOnPe(CkMyPe());
650   {
651     PUP::toMem p(msg->packData);
652     pupAllElements(p);
653   }
654   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
655   if(chkpTable[0]) delete chkpTable[0];
656   chkpTable[0] = msg;
657   //send the checkpoint to my 
658   recvCount++;
659 #endif
660 }
661
662 void CkMemCheckPT::startCheckpoint(){
663 #if CMK_CONVERSE_MPI
664   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
665     CkPrintf("in start checkpointing!!!!\n");
666   int size;
667   {
668     PUP::sizer p;
669     _handleProcData(p,CmiFalse);
670     size = p.size();
671   }
672   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
673   procMsg->len = size;
674   procMsg->cp_flag = 1;
675   {
676     PUP::toMem p(procMsg->packData);
677     _handleProcData(p,CmiFalse);
678   }
679   int pointer = CpvAccess(curPointer);
680   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
681   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
682
683   {
684     PUP::sizer psizer;
685     pupAllElements(psizer);
686     size = psizer.size();
687   }
688   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
689   msg->len = size;
690   msg->cp_flag = 1;
691   {
692     PUP::toMem p(msg->packData);
693     pupAllElements(p);
694   }
695   pointer = CpvAccess(curPointer);
696   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
697   CpvAccess(chkpBuf)[pointer] = msg;
698   if(CkMyPe()==0)
699     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
700   if(CkReplicaAlive()==1){
701     CpvAccess(recvdLocal) = 1;
702     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
703     CkPackMessage(&env);
704     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
705     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
706   }
707   if(CpvAccess(recvdRemote)==1){
708     //compare the checkpoint 
709     int size = CpvAccess(chkpBuf)[pointer]->len;
710     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
711       thisProxy[CkMyPe()].doneComparison(true);
712     }else{
713       CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
714       thisProxy[CkMyPe()].doneComparison(false);
715     }
716     if(CkMyPe()==0)
717       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
718   }
719   else{
720     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
721       { 
722         int pointer = CpvAccess(curPointer);
723         //send the proc data
724         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
725         procMsg->pointer = pointer;
726         envelope * env = (envelope *)(UsrToEnv(procMsg));
727         CkPackMessage(&env);
728         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
729         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
730         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
731           CkPrintf("[%d] sendProcdata\n",CkMyPe());
732         }
733       }
734       //send the array checkpoint data
735       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
736       msg->pointer = CpvAccess(curPointer);
737       envelope * env = (envelope *)(UsrToEnv(msg));
738       CkPackMessage(&env);
739       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
740       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
741       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
742         CkPrintf("[%d] sendArraydata\n",CkMyPe());
743       //can continue work, no need to wait for my replica
744     }
745   }
746 #endif
747 }
748
749 void CkMemCheckPT::doneComparison(bool ret){
750   int _ret = 1;
751   if(!ret){
752     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
753     _ret = 0;
754   }else{
755     _ret = 1;
756   }
757   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
758   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
759 }
760
761 void CkMemCheckPT::doneRComparison(int ret){
762   //    if(CpvAccess(curPointer) == 0){
763   if(ret==CkNumPes()){
764     CpvAccess(localChkpDone) = 1;
765     if(CpvAccess(remoteChkpDone) ==1){
766       thisProxy.resumeFromChkp();
767     }
768     //notify the replica am done
769     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
770     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
771     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
772   }
773   else{
774     CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
775     thisProxy.RollBack();
776   }
777 }
778
779 void CkMemCheckPT::resumeFromChkp(){
780   CpvAccess(recvdRemote) = 0;
781   CpvAccess(recvdLocal) = 0;
782   CpvAccess(localChkpDone) = 0;
783   CpvAccess(remoteChkpDone) = 0;
784   CpvAccess(curPointer)^=1;
785   inCheckpointing = 0;
786   if(CkMyPe() == 0){
787     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
788   }
789   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
790 }
791
792 void CkMemCheckPT::RollBack(){
793   //restore group data
794   checkpointed = 0;
795   CkMemCheckPT::inRestarting = 1;
796   int pointer = CpvAccess(curPointer)^1;//use the previous one
797   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
798   PUP::fromMem p(chkpMsg->packData);    
799
800   //destroy array elements
801   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
802   int numGroups = CkpvAccess(_groupIDTable)->size();
803   for(int i=0;i<numGroups;i++) {
804     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
805     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
806     obj->flushStates();
807     obj->ckJustMigrated();
808   }
809   //restore array elements
810
811   int numElements;
812   p|numElements;
813
814   if(p.isUnpacking()){
815     for(int i=0;i<numElements;i++){
816       //for(int i=0;i<1;i++){
817       CkGroupID gID;
818       CkArrayIndex idx;
819       p|gID;
820       p|idx;
821       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
822       CmiAssert(mgr);
823       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
824     }
825     }
826     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
827     contribute(cb);
828   }
829
830   void CkMemCheckPT::notifyReplicaDie(int pe){
831     //CkPrintf("[%d] receive replica die\n",CkMyPe());
832     replicaAlive = 0;
833     CpvAccess(_remoteCrashedNode) = pe;
834   }
835
836   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
837   {
838 #if CMK_CHKP_ALL
839     int idx = 1;
840     if(msg->bud1 == CkMyPe()){
841       idx = 0;
842     }
843     int isChkpting = msg->cp_flag;
844     if(isChkpting == 1){
845       if(chkpTable[idx]) delete chkpTable[idx];
846     }
847     chkpTable[idx] = msg;
848     if(isChkpting){
849       recvCount++;
850       if(recvCount == 2){
851         if (where == CkCheckPoint_inMEM) {
852           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
853         }
854         else if (where == CkCheckPoint_inDISK) {
855           // another barrier for finalize the writing using fsync
856           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
857           contribute(0,NULL,CkReduction::sum_int,localcb);
858         }
859         else
860           CmiAbort("Unknown checkpoint scheme");
861         recvCount = 0;
862       }
863     }
864 #endif
865   }
866
867   // don't handle array elements
868   static inline void _handleProcData(PUP::er &p, CmiBool create)
869   {
870     // save readonlys, and callback BTW
871     CkPupROData(p);
872
873     // save mainchares 
874     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
875
876 #ifndef CMK_CHARE_USE_PTR
877     // save non-migratable chare
878     CkPupChareData(p);
879 #endif
880
881     // save groups into Groups.dat
882     CkPupGroupData(p,create);
883
884     // save nodegroups into NodeGroups.dat
885     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
886   }
887
888   void CkMemCheckPT::sendProcData()
889   {
890     // find out size of buffer
891     int size;
892     {
893       PUP::sizer p;
894       _handleProcData(p,CmiTrue);
895       size = p.size();
896     }
897     int packSize = size;
898     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
899     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
900     {
901       PUP::toMem p(msg->packData);
902       _handleProcData(p,CmiTrue);
903     }
904     msg->pe = CkMyPe();
905     msg->len = size;
906     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
907     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
908   }
909
910   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
911   {
912     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
913     CpvAccess(procChkptBuf) = msg;
914     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
915     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
916   }
917
918   // ArrayElement call this function to give us the checkpointed data
919   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
920   {
921     int len = ckTable.length();
922     int idx;
923     for (idx=0; idx<len; idx++) {
924       CkCheckPTInfo *entry = ckTable[idx];
925       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
926     }
927     CkAssert(idx < len);
928     int isChkpting = msg->cp_flag;
929     ckTable[idx]->updateBuffer(msg);
930     if (isChkpting) {
931       // all my array elements have returned their inmem data
932       // inform starter processor that I am done.
933       recvCount ++;
934       if (recvCount == ckTable.length()) {
935         if (where == CkCheckPoint_inMEM) {
936           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
937         }
938         else if (where == CkCheckPoint_inDISK) {
939           // another barrier for finalize the writing using fsync
940           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
941           contribute(0,NULL,CkReduction::sum_int,localcb);
942         }
943         else
944           CmiAbort("Unknown checkpoint scheme");
945         recvCount = 0;
946       } 
947     }
948   }
949
950   // only used in disk checkpointing
951   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
952   {
953     delete m;
954 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
955     system("sync");
956 #endif
957     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
958   }
959
960   // only is called on cpStarter when checkpoint is done
961   void CkMemCheckPT::cpFinish()
962   {
963     CmiAssert(CkMyPe() == cpStarter);
964     peCount++;
965     // now that all processors have finished, activate callback
966     if (peCount == 2) 
967     {
968       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
969       peCount = 0;
970       thisProxy.report();
971     }
972   }
973
974   // for debugging, report checkpoint info
975   void CkMemCheckPT::report()
976   {
977     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
978     inCheckpointing = 0;
979 #if !CMK_CHKP_ALL
980     int objsize = 0;
981     int len = ckTable.length();
982     for (int i=0; i<len; i++) {
983       CkCheckPTInfo *entry = ckTable[i];
984       CmiAssert(entry);
985       objsize += entry->getSize();
986     }
987     CmiAssert(CpvAccess(procChkptBuf));
988     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
989 #else
990     if(CkMyPe()==0)
991       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
992 #endif
993   }
994
995   /*****************************************************************************
996     RESTART Procedure
997    *****************************************************************************/
998
999   // master processor of two buddies
1000   inline int CkMemCheckPT::isMaster(int buddype)
1001   {
1002 #if 0
1003     int mype = CkMyPe();
1004     //CkPrintf("ismaster: %d %d\n", pe, mype);
1005     if (CkNumPes() - totalFailed() == 2) {
1006       return mype > buddype;
1007     }
1008     for (int i=1; i<CkNumPes(); i++) {
1009       int me = (buddype+i)%CkNumPes();
1010       if (isFailed(me)) continue;
1011       if (me == mype) return 1;
1012       else return 0;
1013     }
1014     return 0;
1015 #else
1016     // smaller one
1017     int mype = CkMyPe();
1018     //CkPrintf("ismaster: %d %d\n", pe, mype);
1019     if (CkNumPes() - totalFailed() == 2) {
1020       return mype < buddype;
1021     }
1022 #if NODE_CHECKPOINT
1023     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1024     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1025 #else
1026       for (int i=1; i<CkNumPes(); i++) {
1027 #endif
1028         int me = (mype+i)%CkNumPes();
1029         if (isFailed(me)) continue;
1030         if (me == buddype) return 1;
1031         else return 0;
1032       }
1033       return 0;
1034 #endif
1035     }
1036
1037
1038
1039 #if 0
1040     // helper class to pup all elements that belong to same ckLocMgr
1041     class ElementDestoryer : public CkLocIterator {
1042       private:
1043         CkLocMgr *locMgr;
1044       public:
1045         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1046         void addLocation(CkLocation &loc) {
1047           CkArrayIndex idx=loc.getIndex();
1048           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1049           loc.destroy();
1050         }
1051     };
1052 #endif
1053
1054     // restore the bitmap vector for LB
1055     void CkMemCheckPT::resetLB(int diepe)
1056     {
1057 #if CMK_LBDB_ON
1058       int i;
1059       char *bitmap = new char[CkNumPes()];
1060       // set processor available bitmap
1061       get_avail_vector(bitmap);
1062       for (i=0; i<failedPes.length(); i++)
1063         bitmap[failedPes[i]] = 0; 
1064       bitmap[diepe] = 0;
1065
1066 #if CK_NO_PROC_POOL
1067       set_avail_vector(bitmap);
1068 #endif
1069
1070       // if I am the crashed pe, rebuild my failedPEs array
1071       if (CkMyNode() == diepe)
1072         for (i=0; i<CkNumPes(); i++) 
1073           if (bitmap[i]==0) failed(i);
1074
1075       delete [] bitmap;
1076 #endif
1077     }
1078
1079     static double restartT;
1080
1081     // in case when failedPe dies, everybody go through its checkpoint table:
1082     // destory all array elements
1083     // recover lost buddies
1084     // reconstruct all array elements from check point data
1085     // called on all processors
1086     void CkMemCheckPT::restart(int diePe)
1087     {
1088 #if CMK_MEM_CHECKPOINT
1089       double curTime = CmiWallTimer();
1090       if (CkMyPe() == diePe){
1091         restartT = CmiWallTimer();
1092         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1093       }
1094       stage = (char*)"resetLB";
1095       startTime = curTime;
1096       if (CkMyPe() == diePe)
1097         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1098
1099 #if CK_NO_PROC_POOL
1100       failed(diePe);    // add into the list of failed pes
1101 #endif
1102       thisFailedPe = diePe;
1103
1104       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1105
1106       inRestarting = 1;
1107
1108       // disable load balancer's barrier
1109       //if (CkMyPe() != diePe) resetLB(diePe);
1110
1111       CKLOCMGR_LOOP(mgr->startInserting(););
1112
1113
1114       if(CmiNumPartition()==1){
1115         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1116       }else{
1117         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1118         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1119       }
1120 #endif
1121     }
1122
1123     // loally remove all array elements
1124     void CkMemCheckPT::removeArrayElements()
1125     {
1126 #if CMK_MEM_CHECKPOINT
1127       int len = ckTable.length();
1128       double curTime = CmiWallTimer();
1129       if (CkMyPe() == thisFailedPe) 
1130         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1131       stage = (char*)"removeArrayElements";
1132       startTime = curTime;
1133
1134       //  if (cpCallback.isInvalid()) 
1135       //          CkPrintf("invalid pe %d\n",CkMyPe());
1136       //          CkAbort("Didn't set restart callback\n");;
1137       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1138
1139       // get rid of all buffering and remote recs
1140       // including destorying all array elements
1141 #if CK_NO_PROC_POOL  
1142       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1143 #else
1144       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1145 #endif
1146       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1147 #endif
1148     }
1149
1150     // flush state in reduction manager
1151     void CkMemCheckPT::resetReductionMgr()
1152     {
1153       if (CkMyPe() == thisFailedPe) 
1154         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1155       int numGroups = CkpvAccess(_groupIDTable)->size();
1156       for(int i=0;i<numGroups;i++) {
1157         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1158         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1159         obj->flushStates();
1160         obj->ckJustMigrated();
1161       }
1162       // reset again
1163       //CpvAccess(_qd)->flushStates();
1164       if(CmiNumPartition()==1){
1165         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1166       }
1167       else
1168         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1169     }
1170
1171     // recover the lost buddies
1172     void CkMemCheckPT::recoverBuddies()
1173     {
1174       int idx;
1175       int len = ckTable.length();
1176       // ready to flush reduction manager
1177       // cannot be CkMemCheckPT::restart because destory will modify states
1178       double curTime = CmiWallTimer();
1179       if (CkMyPe() == thisFailedPe)
1180         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1181       stage = (char *)"recoverBuddies";
1182       if (CkMyPe() == thisFailedPe)
1183         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1184       startTime = curTime;
1185
1186       // recover buddies
1187       expectCount = 0;
1188 #if !CMK_CHKP_ALL
1189       for (idx=0; idx<len; idx++) {
1190         CkCheckPTInfo *entry = ckTable[idx];
1191         if (entry->pNo == thisFailedPe) {
1192 #if CK_NO_PROC_POOL
1193           // find a new buddy
1194           int budPe = BuddyPE(CkMyPe());
1195 #else
1196           int budPe = thisFailedPe;
1197 #endif
1198           CkArrayCheckPTMessage *msg = entry->getCopy();
1199           msg->bud1 = budPe;
1200           msg->bud2 = CkMyPe();
1201           msg->cp_flag = 0;            // not checkpointing
1202           thisProxy[budPe].recoverEntry(msg);
1203           expectCount ++;
1204         }
1205       }
1206 #else
1207       //send to failed pe
1208       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1209 #if CK_NO_PROC_POOL
1210         // find a new buddy
1211         int budPe = BuddyPE(CkMyPe());
1212 #else
1213         int budPe = thisFailedPe;
1214 #endif
1215         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1216         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1217         msg->cp_flag = 0;            // not checkpointing
1218         msg->bud1 = budPe;
1219         msg->bud2 = CkMyPe();
1220         thisProxy[budPe].recoverEntry(msg);
1221         expectCount ++;
1222       }
1223 #endif
1224
1225       if (expectCount == 0) {
1226         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1227       }
1228       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1229     }
1230
1231     void CkMemCheckPT::gotData()
1232     {
1233       ackCount ++;
1234       if (ackCount == expectCount) {
1235         ackCount = 0;
1236         expectCount = -1;
1237         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1238         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1239       }
1240     }
1241
1242     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1243     {
1244
1245       for (int i=0; i<n; i++) {
1246         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1247         mgr->updateLocation(idx[i], nowOnPe);
1248       }
1249       thisProxy[nowOnPe].gotReply();
1250     }
1251
1252     // restore array elements
1253     void CkMemCheckPT::recoverArrayElements()
1254     {
1255       if(CmiMyPartition()==1&&CkMyPe()==0){
1256         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1257         fflush(stdout);
1258       }
1259       double curTime = CmiWallTimer();
1260       int len = ckTable.length();
1261       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1262       stage = (char *)"recoverArrayElements";
1263       if (CkMyPe() == thisFailedPe)
1264         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1265       startTime = curTime;
1266       int flag = 0;
1267       // recover all array elements
1268       int count = 0;
1269
1270 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1271       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1272       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1273 #endif
1274
1275 #if !CMK_CHKP_ALL
1276       for (int idx=0; idx<len; idx++)
1277       {
1278         CkCheckPTInfo *entry = ckTable[idx];
1279 #if CK_NO_PROC_POOL
1280         // the bigger one will do 
1281         //    if (CkMyPe() < entry->pNo) continue;
1282         if (!isMaster(entry->pNo)) continue;
1283 #else
1284         // smaller one do it, which has the original object
1285         if (CkMyPe() == entry->pNo+1 || 
1286             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1287 #endif
1288         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1289
1290         entry->updateBuddy(CkMyPe(), entry->pNo);
1291         CkArrayCheckPTMessage *msg = entry->getCopy();
1292         // gzheng
1293         //thisProxy[CkMyPe()].inmem_restore(msg);
1294         inmem_restore(msg);
1295 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1296         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1297         int homePe = mgr->homePe(msg->index);
1298         if (homePe != CkMyPe()) {
1299           gmap[homePe].push_back(msg->locMgr);
1300           imap[homePe].push_back(msg->index);
1301         }
1302 #endif
1303         CkFreeMsg(msg);
1304         count ++;
1305       }
1306 #else
1307       char * packData;
1308       if(CmiNumPartition()==1){
1309         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1310         packData = (char *)msg->packData;
1311       }
1312       else{
1313         int pointer = CpvAccess(curPointer);
1314         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1315         packData = msg->packData;
1316       }
1317 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1318       recoverAll(packData,gmap,imap);
1319 #else
1320       recoverAll(packData);
1321 #endif
1322 #endif
1323       curTime = CmiWallTimer();
1324 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1325       for (int i=0; i<CkNumPes(); i++) {
1326         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1327           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1328           flag++;       
1329         }
1330       }
1331       delete [] imap;
1332       delete [] gmap;
1333 #endif
1334       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1335
1336       CKLOCMGR_LOOP(mgr->doneInserting(););
1337
1338       // _crashedNode = -1;
1339       CpvAccess(_crashedNode) = -1;
1340 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1341       if (CkMyPe() == 0)
1342         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1343 #else
1344       if(flag == 0)
1345       {
1346         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1347       }
1348 #endif
1349       if(CmiMyPartition()==1&&CkMyPe()==0){
1350         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1351         fflush(stdout);
1352       }
1353     }
1354
1355     void CkMemCheckPT::gotReply(){
1356       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1357     }
1358
1359     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1360       if(CmiMyPartition()==1&&CkMyPe()==0){
1361         CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1362         fflush(stdout);
1363       }
1364 #if CMK_CHKP_ALL
1365       PUP::fromMem p(packData);
1366       int numElements;
1367       p|numElements;
1368       if(p.isUnpacking()){
1369         for(int i=0;i<numElements;i++){
1370           CkGroupID gID;
1371           CkArrayIndex idx;
1372           p|gID;
1373           p|idx;
1374           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1375           int homePe = mgr->homePe(idx);
1376 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1377           mgr->resume(idx,p,CmiTrue,CmiTrue);
1378 #else
1379           if(CmiNumPartition()==1)
1380             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1381           else{
1382             if(CkMyPe()==thisFailedPe){
1383               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1384             }
1385             else{
1386               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1387             }
1388           }
1389 #endif
1390 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1391           homePe = mgr->homePe(idx);
1392           if (homePe != CkMyPe()) {
1393             gmap[homePe].push_back(gID);
1394             imap[homePe].push_back(idx);
1395           }
1396 #endif
1397         }
1398       }
1399 #endif
1400       if(CmiMyPartition()==1&&CkMyPe()==0){
1401         CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1402         fflush(stdout);
1403       }
1404     }
1405
1406
1407     // on every processor
1408     // turn load balancer back on
1409     void CkMemCheckPT::finishUp()
1410     {
1411       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1412       //CKLOCMGR_LOOP(mgr->doneInserting(););
1413
1414       if (CkMyPe() == thisFailedPe)
1415       {
1416         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1417         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1418         fflush(stdout);
1419       }
1420       fflush(stdout);
1421       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1422       inRestarting = 0;
1423       if(CmiMyPartition()==0){
1424         CmiPrintf("[%d] Resume Done\n", CkMyPe());
1425         fflush(stdout);
1426       }
1427
1428 #if CMK_CONVERSE_MPI    
1429       if(CmiNumPartition()!=1){
1430         CpvAccess(recvdProcChkp) = 0;
1431         CpvAccess(recvdArrayChkp) = 0;
1432         CpvAccess(curPointer)^=1;
1433         //notify my replica, restart is done
1434         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1435         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1436         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1437       }
1438       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1439         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1440       }
1441 #endif
1442
1443 #if CK_NO_PROC_POOL
1444 #if NODE_CHECKPOINT
1445       int numnodes = CmiNumPhysicalNodes();
1446 #else
1447       int numnodes = CkNumPes();
1448 #endif
1449       if (numnodes-totalFailed() <=2) {
1450         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1451         _memChkptOn = 0;
1452       }
1453 #endif
1454     }
1455
1456     void CkMemCheckPT::recoverFromSoftFailure()
1457     {
1458       inRestarting = 0;
1459       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1460     }
1461     // called only on 0
1462     void CkMemCheckPT::quiescence(CkCallback &cb)
1463     {
1464       static int pe_count = 0;
1465       pe_count ++;
1466       CmiAssert(CkMyPe() == 0);
1467       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1468       if (pe_count == CkNumPes()) {
1469         pe_count = 0;
1470         cb.send();
1471       }
1472     }
1473
1474     // User callable function - to start a checkpoint
1475     // callback cb is used to pass control back
1476     void CkStartMemCheckpoint(CkCallback &cb)
1477     {
1478 #if CMK_MEM_CHECKPOINT
1479       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1480       if (_memChkptOn == 0) {
1481         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1482         cb.send();
1483         return;
1484       }
1485       if (CkInRestarting()) {
1486         // trying to checkpointing during restart
1487         cb.send();
1488         return;
1489       }
1490       // store user callback and user data
1491       CkMemCheckPT::cpCallback = cb;
1492
1493       // broadcast to start check pointing
1494       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1495       checkptMgr.doItNow(CkMyPe(), cb);
1496 #else
1497       // when mem checkpoint is disabled, invike cb immediately
1498       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1499       cb.send();
1500 #endif
1501     }
1502
1503     void CkRestartCheckPoint(int diePe)
1504     {
1505       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1506       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1507       // broadcast
1508       checkptMgr.restart(diePe);
1509     }
1510
1511     static int _diePE = -1;
1512
1513     // callback function used locally by ccs handler
1514     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1515     {
1516       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1517       CkRestartCheckPoint(_diePE);
1518     }
1519
1520
1521     // called on crashed PE
1522     static void restartBeginHandler(char *msg)
1523     {
1524       CmiFree(msg);
1525 #if CMK_MEM_CHECKPOINT
1526 #if CMK_USE_BARRIER
1527       if(CkMyPe()!=_diePE){
1528         printf("restar begin on %d\n",CkMyPe());
1529         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1530         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1531         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1532       }else{
1533         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1534         CkRestartCheckPointCallback(NULL, NULL);
1535       }
1536 #else
1537       static int count = 0;
1538       CmiAssert(CkMyPe() == _diePE);
1539       count ++;
1540       if (count == CkNumPes()) {
1541         printf("restart begin on %d\n",CkMyPe());
1542         CkRestartCheckPointCallback(NULL, NULL);
1543         count = 0;
1544       }
1545 #endif
1546 #endif
1547     }
1548
1549     extern void _discard_charm_message();
1550     extern void _resume_charm_message();
1551
1552     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1553       return data;
1554     }
1555
1556     static void restartBcastHandler(char *msg)
1557     {
1558 #if CMK_MEM_CHECKPOINT
1559       // advance phase counter
1560       CkMemCheckPT::inRestarting = 1;
1561       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1562       // gzheng
1563       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1564
1565       if (CkMyPe()==_diePE)
1566         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1567
1568       // reset QD counters
1569       /*  gzheng
1570           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1571        */
1572
1573       /*  gzheng
1574           if (CkMyPe()==_diePE)
1575           CkRestartCheckPointCallback(NULL, NULL);
1576        */
1577       CmiFree(msg);
1578
1579       _resume_charm_message();
1580
1581       // reduction
1582       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1583       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1584 #if CMK_USE_BARRIER
1585       //CmiPrintf("before reduce\n");   
1586       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1587       //CmiPrintf("after reduce\n");    
1588 #else
1589       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1590 #endif 
1591       checkpointed = 0;
1592 #endif
1593     }
1594
1595     extern void _initDone();
1596
1597     bool compare(char * buf1, char *buf2){
1598         PUP::checker pchecker(buf1,buf2);
1599                 pchecker.skip();
1600
1601                 int numElements;
1602                 pchecker|numElements;
1603                 for(int i=0;i<numElements;i++){
1604       //for(int i=0;i<1;i++){
1605       CkGroupID gID;
1606       CkArrayIndex idx;
1607
1608       pchecker|gID;
1609       pchecker|idx;
1610
1611       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1612       mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1613       }
1614       return pchecker.getResult();
1615       //return true;
1616     }
1617
1618     static void recvRemoteChkpHandler(char *msg){
1619       envelope *env = (envelope *)msg;
1620       CkUnpackMessage(&env);
1621       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1622       if(CpvAccess(recvdLocal)==1){
1623         int pointer = CpvAccess(curPointer);
1624         int size = CpvAccess(chkpBuf)[pointer]->len;
1625         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1626           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1627         }else
1628         {
1629           CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1630           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1631         }
1632         delete chkpMsg;
1633         if(CkMyPe()==0)
1634           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1635       }else{
1636         CpvAccess(recvdRemote) = 1;
1637         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1638         CpvAccess(buddyBuf) = chkpMsg;
1639       }  
1640     }
1641
1642     static void replicaRecoverHandler(char *msg){
1643       CpvAccess(_remoteCrashedNode) = -1;
1644       CkMemCheckPT::replicaAlive = 1;
1645       //fflush(stdout);
1646       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1647       bool ret = true;
1648       CpvAccess(remoteChkpDone) = 1;
1649       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1650       CmiFree(msg);
1651   
1652     }
1653     static void replicaChkpDoneHandler(char *msg){
1654       CpvAccess(remoteChkpDone) = 1;
1655       if(CpvAccess(localChkpDone) == 1)
1656         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1657       CmiFree(msg);
1658     }
1659
1660     static void replicaDieHandler(char * msg){
1661 #if CMK_CONVERSE_MPI    
1662       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1663       CpvAccess(_remoteCrashedNode) = diePe;
1664       CkMemCheckPT::replicaAlive = 0;
1665       find_spare_mpirank(diePe,CmiMyPartition()^1);
1666       if(CkMyPe()==diePe){
1667         CkPrintf("pe %d in replicad word die\n",diePe);
1668         CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1669       }
1670 #endif
1671       //broadcast to my partition to get local max iter
1672       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1673       CmiFree(msg);
1674     }
1675
1676
1677     static void replicaDieBcastHandler(char *msg){
1678       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1679       CpvAccess(_remoteCrashedNode) = diePe;
1680       CkMemCheckPT::replicaAlive = 0;
1681       CmiFree(msg);
1682     }
1683
1684     static void recoverRemoteProcDataHandler(char *msg){
1685       if(CmiMyPartition()==1&&CkMyPe()==0){
1686         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1687         fflush(stdout);
1688       }
1689       envelope *env = (envelope *)msg;
1690       CkUnpackMessage(&env);
1691       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1692
1693       //store the checkpoint
1694       int pointer = procMsg->pointer;
1695
1696
1697       if(CkMyPe()==CpvAccess(_crashedNode)){
1698         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1699         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1700         PUP::fromMem p(procMsg->packData);
1701         _handleProcData(p,CmiTrue);
1702         _initDone();
1703       }
1704       else{
1705         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1706         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1707         //_handleProcData(p,CmiFalse);
1708       }
1709       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1710       CKLOCMGR_LOOP(mgr->startInserting(););
1711
1712       CpvAccess(recvdProcChkp) =1;
1713       if(CpvAccess(recvdArrayChkp)==1){
1714         _resume_charm_message();
1715         _diePE = CpvAccess(_crashedNode);
1716         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1717         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1718         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1719       }
1720       if(CmiMyPartition()==1&&CkMyPe()==0){
1721         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1722         fflush(stdout);
1723       }
1724     }
1725
1726     static void recoverRemoteArrayDataHandler(char *msg){
1727       envelope *env = (envelope *)msg;
1728       CkUnpackMessage(&env);
1729       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1730       if(CmiMyPartition()==1&&CkMyPe()==0){
1731         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1732         fflush(stdout);
1733       }
1734
1735       //store the checkpoint
1736       int pointer = chkpMsg->pointer;
1737       CpvAccess(curPointer) = pointer;
1738       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1739       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1740       CpvAccess(recvdArrayChkp) =1;
1741       CkMemCheckPT::inRestarting = 1;
1742       if(CpvAccess(recvdProcChkp) == 1){
1743         _resume_charm_message();
1744         _diePE = CpvAccess(_crashedNode);
1745         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1746         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1747         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1748         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1749       }
1750       if(CmiMyPartition()==1&&CkMyPe()==0){
1751         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1752         fflush(stdout);
1753       }
1754     }
1755
1756     static void recvPhaseHandler(char * msg)
1757     {
1758       CpvAccess(_curRestartPhase)--;
1759       CkMemCheckPT::inRestarting = 1;
1760       CmiFree(msg);
1761     }
1762     // called on crashed processor
1763     static void recoverProcDataHandler(char *msg)
1764     {
1765 #if CMK_MEM_CHECKPOINT
1766       int i;
1767       envelope *env = (envelope *)msg;
1768       CkUnpackMessage(&env);
1769       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1770       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1771       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1772       PUP::fromMem p(procMsg->packData);
1773       _handleProcData(p,CmiTrue);
1774
1775       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1776       // gzheng
1777       CKLOCMGR_LOOP(mgr->startInserting(););
1778
1779       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1780       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1781       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1782       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1783
1784       _initDone();
1785       //   CpvAccess(_qd)->flushStates();
1786       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1787 #endif
1788     }
1789     //for replica, got the phase number from my neighbor processor in the current partition
1790     static void askPhaseHandler(char *msg)
1791     {
1792 #if CMK_MEM_CHECKPOINT
1793       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1794       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1795       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1796       CmiSetHandler(msg, recvPhaseHandlerIdx);
1797       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1798 #endif
1799     }
1800     // called on its backup processor
1801     // get backup message buffer and sent to crashed processor
1802     static void askProcDataHandler(char *msg)
1803     {
1804 #if CMK_MEM_CHECKPOINT
1805       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1806       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1807       if (CpvAccess(procChkptBuf) == NULL)  {
1808         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1809         CkAbort("no checkpoint found");
1810       }
1811       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1812       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1813
1814       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1815
1816       CkPackMessage(&env);
1817       CmiSetHandler(env, recoverProcDataHandlerIdx);
1818       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1819       CpvAccess(procChkptBuf) = NULL;
1820       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1821 #endif
1822     }
1823
1824     // called on PE 0
1825     void qd_callback(void *m)
1826     {
1827       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1828       CkFreeMsg(m);
1829       if(CmiNumPartition()==1){
1830 #ifdef CMK_SMP
1831         for(int i=0;i<CmiMyNodeSize();i++){
1832           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1833           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1834           CmiSetHandler(msg, askProcDataHandlerIdx);
1835           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1836           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1837         }
1838         return;
1839 #endif
1840         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1841         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1842         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1843         CmiSetHandler(msg, askProcDataHandlerIdx);
1844         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1845         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1846       }
1847       else{
1848         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1849         CmiSetHandler(msg, recvPhaseHandlerIdx);
1850         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1851       }
1852     }
1853
1854     // on crashed node
1855     void CkMemRestart(const char *dummy, CkArgMsg *args)
1856     {
1857 #if CMK_MEM_CHECKPOINT
1858       _diePE = CmiMyNode();
1859       CpvAccess( _crashedNode )= CmiMyNode();
1860       CkMemCheckPT::inRestarting = 1;
1861       _discard_charm_message();
1862
1863       /*if(CmiMyRank()==0){
1864         CkCallback cb(qd_callback);
1865         CkStartQD(cb);
1866         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1867         }*/
1868       if(CmiNumPartition()==1){
1869         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1870         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1871         restartT = CmiWallTimer();
1872         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1873         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1874         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1875         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1876         CmiSetHandler(msg, askProcDataHandlerIdx);
1877         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1878         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1879       }
1880       else{
1881         CkCallback cb(qd_callback);
1882         CkStartQD(cb);
1883       }
1884 #else
1885       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1886 #endif
1887     }
1888
1889     // can be called in other files
1890     // return true if it is in restarting
1891     extern "C"
1892       int CkInRestarting()
1893       {
1894 #if CMK_MEM_CHECKPOINT
1895         if (CpvAccess( _crashedNode)!=-1) return 1;
1896         // gzheng
1897         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1898         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1899         return CkMemCheckPT::inRestarting;
1900 #else
1901         return 0;
1902 #endif
1903       }
1904
1905     extern "C"
1906       int CkReplicaAlive()
1907       {
1908         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1909         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1910         return CkMemCheckPT::replicaAlive;
1911
1912         /*if(CkMemCheckPT::replicaDead==1)
1913           return 0;
1914           else          
1915           return 1;*/
1916       }
1917
1918     extern "C"
1919       int CkInCheckpointing()
1920       {
1921         return CkMemCheckPT::inCheckpointing;
1922       }
1923
1924     extern "C"
1925       void CkSetInLdb(){
1926 #if CMK_MEM_CHECKPOINT
1927         CkMemCheckPT::inLoadbalancing = 1;
1928 #endif
1929       }
1930
1931     extern "C"
1932       int CkInLdb(){
1933 #if CMK_MEM_CHECKPOINT
1934         return CkMemCheckPT::inLoadbalancing;
1935 #endif
1936         return 0;
1937       }
1938
1939     extern "C"
1940       void CkResetInLdb(){
1941 #if CMK_MEM_CHECKPOINT
1942         CkMemCheckPT::inLoadbalancing = 0;
1943 #endif
1944       }
1945
1946     /*****************************************************************************
1947       module initialization
1948      *****************************************************************************/
1949
1950     static int arg_where = CkCheckPoint_inMEM;
1951
1952 #if CMK_MEM_CHECKPOINT
1953     void init_memcheckpt(char **argv)
1954     {
1955       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1956         arg_where = CkCheckPoint_inDISK;
1957       }
1958       // initiliazing _crashedNode variable
1959       CpvInitialize(int, _crashedNode);
1960       CpvInitialize(int, _remoteCrashedNode);
1961       CpvAccess(_crashedNode) = -1;
1962       CpvAccess(_remoteCrashedNode) = -1;
1963       init_FI(argv);
1964
1965     }
1966 #endif
1967
1968     class CkMemCheckPTInit: public Chare {
1969       public:
1970         CkMemCheckPTInit(CkArgMsg *m) {
1971 #if CMK_MEM_CHECKPOINT
1972           if (arg_where == CkCheckPoint_inDISK) {
1973             CkPrintf("Charm++> Double-disk Checkpointing. \n");
1974           }
1975           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1976           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1977 #endif
1978         }
1979     };
1980
1981     static void notifyHandler(char *msg)
1982     {
1983 #if CMK_MEM_CHECKPOINT
1984       CmiFree(msg);
1985       /* immediately increase restart phase to filter old messages */
1986       CpvAccess(_curRestartPhase) ++;
1987       CpvAccess(_qd)->flushStates();
1988       _discard_charm_message();
1989
1990 #endif
1991     }
1992
1993     extern "C"
1994       void notify_crash(int node)
1995       {
1996 #ifdef CMK_MEM_CHECKPOINT
1997         CpvAccess( _crashedNode) = node;
1998 #ifdef CMK_SMP
1999         for(int i=0;i<CkMyNodeSize();i++){
2000           CpvAccessOther(_crashedNode,i)=node;
2001         }
2002 #endif
2003         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2004         CkMemCheckPT::inRestarting = 1;
2005
2006         // this may be in interrupt handler, send a message to reset QD
2007         int pe = CmiNodeFirst(CkMyNode());
2008         for(int i=0;i<CkMyNodeSize();i++){
2009           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2010           CmiSetHandler(msg, notifyHandlerIdx);
2011           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2012         }
2013 #endif
2014       }
2015
2016     extern "C" void (*notify_crash_fn)(int node);
2017
2018
2019 #if CMK_CONVERSE_MPI
2020     void buddyDieHandler(char *msg)
2021     {
2022 #if CMK_MEM_CHECKPOINT
2023       // notify
2024       CkMemCheckPT::inRestarting = 1;
2025       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2026       notify_crash(diepe);
2027       // send message to crash pe to let it restart
2028       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2029       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2030       int buddy = obj->BuddyPE(CmiMyPe());
2031       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2032       if (buddy == diepe)  {
2033         mpi_restart_crashed(diepe, newrank);
2034       }
2035 #endif
2036     }
2037
2038     void pingHandler(void *msg)
2039     {
2040       lastPingTime = CmiWallTimer();
2041       CmiFree(msg);
2042     }
2043
2044     void pingCheckHandler()
2045     {
2046 #if CMK_MEM_CHECKPOINT
2047       double now = CmiWallTimer();
2048       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2049         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2050         int i, pe, buddy;
2051         // tell everyone the buddy dies
2052         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2053         for (i = 1; i < CmiNumPes(); i++) {
2054           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2055           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2056         }
2057         buddy = pe;
2058         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2059         fflush(stdout);
2060         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2061         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2062         CmiSetHandler(msg, buddyDieHandlerIdx);
2063         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2064         //send to everyone in the other world
2065         if(CmiNumPartition()!=1){
2066           for(int i=0;i<CmiNumPes();i++){
2067             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2068             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2069             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2070             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2071           }
2072         }
2073       }
2074         else 
2075           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2076 #endif
2077       }
2078
2079       void pingBuddy()
2080       {
2081 #if CMK_MEM_CHECKPOINT
2082         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2083         if (obj) {
2084           int buddy = obj->BuddyPE(CkMyPe());
2085           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2086           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2087           CmiSetHandler(msg, pingHandlerIdx);
2088           CmiGetRestartPhase(msg) = 9999;
2089           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2090         }
2091         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2092 #endif
2093       }
2094 #endif
2095
2096       // initproc
2097       void CkRegisterRestartHandler( )
2098       {
2099 #if CMK_MEM_CHECKPOINT
2100         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2101         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2102         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2103         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2104         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2105
2106         //for replica
2107         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2108         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2109         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2110         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2111         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2112         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2113         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2114         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2115         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2116
2117 #if CMK_CONVERSE_MPI
2118         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2119         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2120         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2121 #endif
2122
2123         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2124         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2125         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2126         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2127         CpvInitialize(int,curPointer);
2128         CpvInitialize(int,recvdLocal);
2129         CpvInitialize(int,localChkpDone);
2130         CpvInitialize(int,remoteChkpDone);
2131         CpvInitialize(int,recvdRemote);
2132         CpvInitialize(int,recvdProcChkp);
2133         CpvInitialize(int,recvdArrayChkp);
2134
2135         CpvAccess(procChkptBuf) = NULL;
2136         CpvAccess(buddyBuf) = NULL;
2137         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2138         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2139         CpvAccess(chkpBuf)[0] = NULL;
2140         CpvAccess(chkpBuf)[1] = NULL;
2141         CpvAccess(localProcChkpBuf)[0] = NULL;
2142         CpvAccess(localProcChkpBuf)[1] = NULL;
2143
2144         CpvAccess(curPointer) = 0;
2145         CpvAccess(recvdLocal) = 0;
2146         CpvAccess(localChkpDone) = 0;
2147         CpvAccess(remoteChkpDone) = 0;
2148         CpvAccess(recvdRemote) = 0;
2149         CpvAccess(recvdProcChkp) = 0;
2150         CpvAccess(recvdArrayChkp) = 0;
2151
2152         notify_crash_fn = notify_crash;
2153
2154 #if ! CMK_CONVERSE_MPI
2155         // print pid to kill
2156         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2157         //  sleep(4);
2158 #endif
2159 #endif
2160       }
2161
2162
2163       extern "C"
2164         int CkHasCheckpoints()
2165         {
2166           return checkpointed;
2167         }
2168
2169       /// @todo: the following definitions should be moved to a separate file containing
2170       // structures and functions about fault tolerance strategies
2171
2172       /**
2173        *  * @brief: function for killing a process                                             
2174        *   */
2175 #ifdef CMK_MEM_CHECKPOINT
2176 #if CMK_HAS_GETPID
2177       void killLocal(void *_dummy,double curWallTime){
2178         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2179         if(CmiWallTimer()<killTime-1){
2180           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2181         }else{ 
2182 #if CMK_CONVERSE_MPI
2183           CkDieNow();
2184 #else 
2185           kill(getpid(),SIGKILL);                                               
2186 #endif
2187         }              
2188       } 
2189 #else
2190       void killLocal(void *_dummy,double curWallTime){
2191         CmiAbort("kill() not supported!");
2192       }
2193 #endif
2194 #endif
2195
2196 #ifdef CMK_MEM_CHECKPOINT
2197       /**
2198        * @brief: reads the file with the kill information
2199        */
2200       void readKillFile(){
2201         FILE *fp=fopen(killFile,"r");
2202         if(!fp){
2203           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2204           return;
2205         }
2206         int proc;
2207         double sec;
2208         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2209           if(proc == CkMyNode() && CkMyRank() == 0){
2210             killTime = CmiWallTimer()+sec;
2211             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2212             CcdCallFnAfter(killLocal,NULL,sec*1000);
2213           }
2214         }
2215         fclose(fp);
2216       }
2217
2218 #if ! CMK_CONVERSE_MPI
2219       void CkDieNow()
2220       {
2221 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2222         // ignored for non-mpi version
2223         CmiPrintf("[%d] die now.\n", CmiMyPe());
2224         killTime = CmiWallTimer()+0.001;
2225         CcdCallFnAfter(killLocal,NULL,1);
2226 #endif
2227       }
2228 #endif
2229
2230 #endif
2231
2232 #include "CkMemCheckpoint.def.h"
2233
2234