change detection time
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117  }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, recvdArrayChkp);
131 CpvDeclare(int, recvdProcChkp);
132
133 bool compare(char * buf1, char * buf2);
134 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
135 // Converse function handles
136 static int askPhaseHandlerIdx;
137 static int recvPhaseHandlerIdx;
138 static int askProcDataHandlerIdx;
139 static int restartBcastHandlerIdx;
140 static int recoverProcDataHandlerIdx;
141 static int restartBeginHandlerIdx;
142 static int recvRemoteChkpHandlerIdx;
143 static int replicaDieHandlerIdx;
144 static int replicaDieBcastHandlerIdx;
145 static int replicaRecoverHandlerIdx;
146 static int recoverRemoteProcDataHandlerIdx;
147 static int recoverRemoteArrayDataHandlerIdx;
148 static int notifyHandlerIdx;
149 // compute the backup processor
150 // FIXME: avoid crashed processors
151 #if CMK_CONVERSE_MPI
152 static int pingHandlerIdx;
153 static int pingCheckHandlerIdx;
154 static int buddyDieHandlerIdx;
155 static double lastPingTime = -1;
156
157 extern "C" void mpi_restart_crashed(int pe, int rank);
158 extern "C" int  find_spare_mpirank(int pe,int partition);
159
160 void pingBuddy();
161 void pingCheckHandler();
162
163 #endif
164 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
165
166 inline int CkMemCheckPT::BuddyPE(int pe)
167 {
168   int budpe;
169 #if NODE_CHECKPOINT
170     // buddy is the processor with same rank on the next physical node
171   int r1 = CmiPhysicalRank(pe);
172   int budnode = CmiPhysicalNodeID(pe);
173   do {
174     budnode = (budnode+1)%CmiNumPhysicalNodes();
175     int *pelist;
176     int num;
177     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
178     budpe = pelist[r1 % num];
179   } while (isFailed(budpe));
180   if (budpe == pe) {
181     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
182     CmiAbort("Failed to find a buddy processor");
183   }
184 #else
185   budpe = pe;
186   while (budpe == pe || isFailed(budpe)) 
187           budpe = (budpe+1)%CkNumPes();
188 #endif
189   return budpe;
190 }
191
192 // called in array element constructor
193 // choose and register with 2 buddies for checkpoiting 
194 #if CMK_MEM_CHECKPOINT
195 void ArrayElement::init_checkpt() {
196         if (_memChkptOn == 0) return;
197         if (CkInRestarting()) {
198           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
199         }
200         // only master init checkpoint
201         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
202
203         budPEs[0] = CkMyPe();
204         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
205         CmiAssert(budPEs[0] != budPEs[1]);
206         // inform checkPTMgr
207         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
208         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
209         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
210         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
211 }
212 #endif
213
214 // entry function invoked by checkpoint mgr asking for checkpoint data
215 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
216 #if CMK_MEM_CHECKPOINT
217 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
218 //char index[128];   thisIndexMax.sprint(index);
219 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
220   CkLocMgr *locMgr = thisArray->getLocMgr();
221   CmiAssert(myRec!=NULL);
222   int size;
223   {
224         PUP::sizer p;
225         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
226         size = p.size();
227   }
228   int packSize = size/sizeof(double) +1;
229   CkArrayCheckPTMessage *msg =
230                  new (packSize, 0) CkArrayCheckPTMessage;
231   msg->len = size;
232   msg->index =thisIndexMax;
233   msg->aid = thisArrayID;
234   msg->locMgr = locMgr->getGroupID();
235   msg->cp_flag = 1;
236   {
237         PUP::toMem p(msg->packData);
238         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
239   }
240
241   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
242   checkptMgr.recvData(msg, 2, budPEs);
243   delete m;
244 #endif
245 }
246
247 // checkpoint holder class - for memory checkpointing
248 class CkMemCheckPTInfo: public CkCheckPTInfo
249 {
250   CkArrayCheckPTMessage *ckBuffer;
251 public:
252   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
253                     CkCheckPTInfo(a, loc, idx, pno)
254   {
255     ckBuffer = NULL;
256   }
257   ~CkMemCheckPTInfo() 
258   {
259     if (ckBuffer) delete ckBuffer; 
260   }
261   inline void updateBuffer(CkArrayCheckPTMessage *data) 
262   {
263     CmiAssert(data!=NULL);
264     if (ckBuffer) delete ckBuffer;
265     ckBuffer = data;
266   }    
267   inline CkArrayCheckPTMessage * getCopy()
268   {
269     if (ckBuffer == NULL) {
270       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
271       CmiAbort("Abort!");
272     }
273     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
274   }     
275   inline void updateBuddy(int b1, int b2) {
276      CmiAssert(ckBuffer);
277      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      CmiAssert(ckBuffer);
283      return ckBuffer->len; 
284   }
285 };
286
287 // checkpoint holder class - for in-disk checkpointing
288 class CkDiskCheckPTInfo: public CkCheckPTInfo 
289 {
290   char *fname;
291   int bud1, bud2;
292   int len;                      // checkpoint size
293 public:
294   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
295   {
296 #if CMK_USE_MKSTEMP
297     fname = new char[64];
298     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
299     mkstemp(fname);
300 #else
301     fname=tmpnam(NULL);
302 #endif
303     bud1 = bud2 = -1;
304     len = 0;
305   }
306   ~CkDiskCheckPTInfo() 
307   {
308     remove(fname);
309   }
310   inline void updateBuffer(CkArrayCheckPTMessage *data) 
311   {
312     double t = CmiWallTimer();
313     // unpack it
314     envelope *env = UsrToEnv(data);
315     CkUnpackMessage(&env);
316     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
317     FILE *f = fopen(fname,"wb");
318     PUP::toDisk p(f);
319     CkPupMessage(p, (void **)&data);
320     // delay sync to the end because otherwise the messages are blocked
321 //    fsync(fileno(f));
322     fclose(f);
323     bud1 = data->bud1;
324     bud2 = data->bud2;
325     len = data->len;
326     delete data;
327     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
328   }
329   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
330   {
331     CkArrayCheckPTMessage *data;
332     FILE *f = fopen(fname,"rb");
333     PUP::fromDisk p(f);
334     CkPupMessage(p, (void **)&data);
335     fclose(f);
336     data->bud1 = bud1;                          // update the buddies
337     data->bud2 = bud2;
338     return data;
339   }
340   inline void updateBuddy(int b1, int b2) {
341      bud1 = b1; bud2 = b2;
342      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
343      CmiAssert(pNo != CkMyPe());
344   }
345   inline int getSize() { 
346      return len; 
347   }
348 };
349
350 CkMemCheckPT::CkMemCheckPT(int w)
351 {
352   int numnodes = 0;
353 #if NODE_CHECKPOINT
354   numnodes = CmiNumPhysicalNodes();
355 #else
356   numnodes = CkNumPes();
357 #endif
358 #if CK_NO_PROC_POOL
359   if (numnodes <= 2)
360 #else
361   if (numnodes  == 1)
362 #endif
363   {
364     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
365     _memChkptOn = 0;
366   }
367   inRestarting = 0;
368   inCheckpointing = 0;
369   recvCount = peCount = 0;
370   ackCount = 0;
371   expectCount = -1;
372   where = w;
373   replicaAlive = 1;
374 #if CMK_CONVERSE_MPI
375   void pingBuddy();
376   void pingCheckHandler();
377   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
378   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
379 #endif
380   chkpTable[0] = NULL;
381   chkpTable[1] = NULL;
382   maxIter = -1;
383   recvIterCount = 0;
384   localDecided = false;
385 }
386
387 CkMemCheckPT::~CkMemCheckPT()
388 {
389   int len = ckTable.length();
390   for (int i=0; i<len; i++) {
391     delete ckTable[i];
392   }
393 }
394
395 void CkMemCheckPT::pup(PUP::er& p) 
396
397   CBase_CkMemCheckPT::pup(p); 
398   p|cpStarter;
399   p|thisFailedPe;
400   p|failedPes;
401   p|ckCheckPTGroupID;           // recover global variable
402   p|cpCallback;                 // store callback
403   p|where;                      // where to checkpoint
404   p|peCount;
405   if (p.isUnpacking()) {
406     recvCount = 0;
407 #if CMK_CONVERSE_MPI
408     void pingBuddy();
409     void pingCheckHandler();
410     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
411     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
412 #endif
413           maxIter = -1;
414           recvIterCount = 0;
415           localDecided = false;
416   }
417 }
418
419 void CkMemCheckPT::getIter(){
420         localDecided = true;
421         localMaxIter = maxIter+1;
422         contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
423         int elemCount = CkCountChkpSyncElements();
424         if(elemCount == 0){
425                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
426         }
427 }
428
429 //when one replica failes, decide the next chkp iter
430
431 void CkMemCheckPT::recvIter(int iter){
432         if(maxIter<iter){
433                 maxIter=iter;
434         }
435 }
436
437 void CkMemCheckPT::recvMaxIter(int iter){
438         localDecided = false;
439         CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
440 }
441
442 void CkMemCheckPT::reachChkpIter(){
443         recvIterCount++;
444         elemCount = CkCountChkpSyncElements();
445         if(recvIterCount == elemCount){
446                 recvIterCount = 0;
447                 contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
448         }
449 }
450
451 void CkMemCheckPT::startChkp(){
452         CkPrintf("start checkpoint\n");
453         CkStartMemCheckpoint(cpCallback);
454 }
455
456 // called by checkpoint mgr to restore an array element
457 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
458 {
459 #if CMK_MEM_CHECKPOINT
460   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
461   // m->index.print();
462   PUP::fromMem p(m->packData);
463   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
464   CmiAssert(mgr);
465 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
466   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
467 #else
468   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
469 #endif
470
471   // find a list of array elements bound together
472   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
473   CmiAssert(elt);
474   CkLocRec_local *rec = elt->myRec;
475   CkVec<CkMigratable *> list;
476   mgr->migratableList(rec, list);
477   CmiAssert(list.length() > 0);
478   for (int l=0; l<list.length(); l++) {
479     elt = (ArrayElement *)list[l];
480     elt->budPEs[0] = m->bud1;
481     elt->budPEs[1] = m->bud2;
482     //    reset, may not needed now
483     // for now.
484     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
485       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
486       if (c) c->redNo = 0;
487     }
488   }
489 #endif
490 }
491
492 // return 1 if pe is a crashed processor
493 int CkMemCheckPT::isFailed(int pe)
494 {
495   for (int i=0; i<failedPes.length(); i++)
496     if (failedPes[i] == pe) return 1;
497   return 0;
498 }
499
500 // add pe into history list of all failed processors
501 void CkMemCheckPT::failed(int pe)
502 {
503   if (isFailed(pe)) return;
504   failedPes.push_back(pe);
505 }
506
507 int CkMemCheckPT::totalFailed()
508 {
509   return failedPes.length();
510 }
511
512 // create an checkpoint entry for array element of aid with index.
513 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
514 {
515   // error check, no duplicate
516   int idx, len = ckTable.size();
517   for (idx=0; idx<len; idx++) {
518     CkCheckPTInfo *entry = ckTable[idx];
519     if (index == entry->index) {
520       if (loc == entry->locMgr) {
521           // bindTo array elements
522           return;
523       }
524         // for array inheritance, the following check may fail
525         // because ArrayElement constructor of all superclasses are called
526       if (aid == entry->aid) {
527         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
528         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
529       }
530     }
531   }
532   CkCheckPTInfo *newEntry;
533   if (where == CkCheckPoint_inMEM)
534     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
535   else
536     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
537   ckTable.push_back(newEntry);
538   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
539 }
540
541 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
542 {
543 #if !CMK_CHKP_ALL       
544   int buddy = msg->bud1;
545   if (buddy == CkMyPe()) buddy = msg->bud2;
546   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
547   recvData(msg);
548     // ack
549   thisProxy[buddy].gotData();
550 #else
551   chkpTable[0] = NULL;
552   chkpTable[1] = NULL;
553   recvArrayCheckpoint(msg);
554   thisProxy[msg->bud2].gotData();
555 #endif
556 }
557
558 // loop through my checkpoint table and ask checkpointed array elements
559 // to send me checkpoint data.
560 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
561 {
562   checkpointed = 1;
563   cpCallback = cb;
564   cpStarter = starter;
565   inCheckpointing = 1;
566   if (CkMyPe() == cpStarter) {
567     startTime = CmiWallTimer();
568     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
569   }
570 #if CMK_CONVERSE_MPI
571   if(CmiNumPartition()==1)
572 #endif    
573   {
574
575 #if !CMK_CHKP_ALL
576   int len = ckTable.length();
577   for (int i=0; i<len; i++) {
578     CkCheckPTInfo *entry = ckTable[i];
579       // always let the bigger number processor send request
580     //if (CkMyPe() < entry->pNo) continue;
581       // always let the smaller number processor send request, may on same proc
582     if (!isMaster(entry->pNo)) continue;
583       // call inmem_checkpoint to the array element, ask it to send
584       // back checkpoint data via recvData().
585     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
586     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
587   }
588     // if my table is empty, then I am done
589   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
590 #else
591   startArrayCheckpoint();
592 #endif
593         sendProcData();
594   }
595 #if CMK_CONVERSE_MPI
596   else
597   {
598         startCheckpoint();
599   }
600 #endif
601   // pack and send proc level data
602 }
603
604 class MemElementPacker : public CkLocIterator{
605         private:
606                 CkLocMgr *locMgr;
607                 PUP::er &p;
608         public:
609                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
610                 void addLocation(CkLocation &loc){
611                         CkArrayIndexMax idx = loc.getIndex();
612                         CkGroupID gID = locMgr->ckGetGroupID();
613                         p|gID;
614                         p|idx;
615                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
616                 }
617 };
618
619 void pupAllElements(PUP::er &p){
620 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
621         int numElements;
622         if(!p.isUnpacking()){
623                 numElements = CkCountArrayElements();
624         }
625         p | numElements;
626         if(!p.isUnpacking()){
627                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
628         }
629 #endif
630 }
631
632 void CkMemCheckPT::startArrayCheckpoint(){
633 #if CMK_CHKP_ALL
634         int size;
635         {
636                 PUP::sizer psizer;
637                 pupAllElements(psizer);
638                 size = psizer.size();
639         }
640         int packSize = size/sizeof(double)+1;
641  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
642         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
643         msg->len = size;
644         msg->cp_flag = 1;
645         int budPEs[2];
646         msg->bud1=CkMyPe();
647         msg->bud2=ChkptOnPe(CkMyPe());
648         {
649                 PUP::toMem p(msg->packData);
650                 pupAllElements(p);
651         }
652         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
653         if(chkpTable[0]) delete chkpTable[0];
654         chkpTable[0] = msg;
655         //send the checkpoint to my 
656         recvCount++;
657 #endif
658 }
659
660 void CkMemCheckPT::startCheckpoint(){
661 #if CMK_CONVERSE_MPI
662         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
663     CkPrintf("in start checkpointing!!!!\n");
664   int size;
665   {
666     PUP::sizer p;
667     _handleProcData(p,CmiFalse);
668     size = p.size();
669   }
670         CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
671         procMsg->len = size;
672         procMsg->cp_flag = 1;
673         {
674                 PUP::toMem p(procMsg->packData);
675                 _handleProcData(p,CmiFalse);
676         }
677         int pointer = CpvAccess(curPointer);
678         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
679                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
680         
681         {
682                 PUP::sizer psizer;
683                 pupAllElements(psizer);
684                 size = psizer.size();
685         }
686         CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
687         msg->len = size;
688         msg->cp_flag = 1;
689         {
690                 PUP::toMem p(msg->packData);
691                 pupAllElements(p);
692         }
693         pointer = CpvAccess(curPointer);
694         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
695     CkPrintf("start checkpointing!!!!\n");
696         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
697                 CpvAccess(chkpBuf)[pointer] = msg;
698         if(CkReplicaAlive()==1){
699                 CpvAccess(recvdLocal) = 1;
700                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
701                 CkPackMessage(&env);
702                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
703                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
704         }
705         if(CpvAccess(recvdRemote)==1){
706                 //compare the checkpoint 
707           int size = CpvAccess(chkpBuf)[pointer]->len;
708           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
709                 thisProxy[CkMyPe()].doneComparison(true);
710           }else{
711                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
712                 thisProxy[CkMyPe()].doneComparison(false);
713           }
714   }
715         else{
716                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
717                         {       
718                                 int pointer = CpvAccess(curPointer);
719                                 //send the proc data
720                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
721                                 procMsg->pointer = pointer;
722                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
723                                 CkPackMessage(&env);
724                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
725                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
726                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
727                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
728                                 }
729                         }
730                         //send the array checkpoint data
731                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
732                         msg->pointer = CpvAccess(curPointer);
733                         envelope * env = (envelope *)(UsrToEnv(msg));
734                         CkPackMessage(&env);
735                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
736                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
737                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
738                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
739                         //can continue work, no need to wait for my replica
740                 }
741         }
742 #endif
743 }
744
745 void CkMemCheckPT::doneComparison(bool ret){
746         int _ret = 1;
747         if(!ret){
748         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
749                 _ret = 0;
750         }else{
751                 _ret = 1;
752         }
753         inCheckpointing = 0;
754         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
755         contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
756 }
757
758 void CkMemCheckPT::doneRComparison(int ret){
759         CpvAccess(recvdRemote) = 0;
760         CpvAccess(recvdLocal) = 0;
761 //      if(CpvAccess(curPointer) == 0){
762         if(ret==CkNumPes()){
763         CpvAccess(curPointer)^=1;
764                 if(CkMyPe() == 0){
765                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
766                 }
767                 CKLOCMGR_LOOP(mgr->resumeFromChkp(););
768         }
769         else{
770         CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
771                 RollBack();
772         }
773 }
774
775 void CkMemCheckPT::RollBack(){
776         //restore group data
777         checkpointed = 0;
778         CkMemCheckPT::inRestarting = 1;
779         int pointer = CpvAccess(curPointer)^1;//use the previous one
780     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
781         PUP::fromMem p(chkpMsg->packData);      
782         
783         //destroy array elements
784         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
785           int numGroups = CkpvAccess(_groupIDTable)->size();
786           for(int i=0;i<numGroups;i++) {
787                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
788                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
789                 obj->flushStates();
790                 obj->ckJustMigrated();
791           }
792         //restore array elements
793         
794         int numElements;
795         p|numElements;
796         
797         if(p.isUnpacking()){
798                 for(int i=0;i<numElements;i++){
799                 //for(int i=0;i<1;i++){
800                         CkGroupID gID;
801                         CkArrayIndex idx;
802                         p|gID;
803                         p|idx;
804                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
805                         CmiAssert(mgr);
806                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
807                 }
808         }
809         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
810         contribute(cb);
811 }
812
813 void CkMemCheckPT::notifyReplicaDie(int pe){
814         //CkPrintf("[%d] receive replica die\n",CkMyPe());
815         replicaAlive = 0;
816         CpvAccess(_remoteCrashedNode) = pe;
817 }
818
819 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
820 {
821 #if CMK_CHKP_ALL
822         int idx = 1;
823         if(msg->bud1 == CkMyPe()){
824                 idx = 0;
825         }
826         int isChkpting = msg->cp_flag;
827         if(isChkpting == 1){
828                 if(chkpTable[idx]) delete chkpTable[idx];
829         }
830         chkpTable[idx] = msg;
831         if(isChkpting){
832                 recvCount++;
833                 if(recvCount == 2){
834                   if (where == CkCheckPoint_inMEM) {
835                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
836                   }
837                   else if (where == CkCheckPoint_inDISK) {
838                         // another barrier for finalize the writing using fsync
839                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
840                         contribute(0,NULL,CkReduction::sum_int,localcb);
841                   }
842                   else
843                         CmiAbort("Unknown checkpoint scheme");
844                   recvCount = 0;
845                 }
846         }
847 #endif
848 }
849
850 // don't handle array elements
851 static inline void _handleProcData(PUP::er &p, CmiBool create)
852 {
853     // save readonlys, and callback BTW
854     CkPupROData(p);
855
856     // save mainchares 
857     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
858         
859 #ifndef CMK_CHARE_USE_PTR
860     // save non-migratable chare
861     CkPupChareData(p);
862 #endif
863
864     // save groups into Groups.dat
865     CkPupGroupData(p,create);
866
867     // save nodegroups into NodeGroups.dat
868     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
869 }
870
871 void CkMemCheckPT::sendProcData()
872 {
873   // find out size of buffer
874   int size;
875   {
876     PUP::sizer p;
877     _handleProcData(p,CmiTrue);
878     size = p.size();
879   }
880   int packSize = size;
881   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
882   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
883   {
884     PUP::toMem p(msg->packData);
885     _handleProcData(p,CmiTrue);
886   }
887   msg->pe = CkMyPe();
888   msg->len = size;
889   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
890   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
891 }
892
893 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
894 {
895   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
896   CpvAccess(procChkptBuf) = msg;
897   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
898   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
899 }
900
901 // ArrayElement call this function to give us the checkpointed data
902 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
903 {
904   int len = ckTable.length();
905   int idx;
906   for (idx=0; idx<len; idx++) {
907     CkCheckPTInfo *entry = ckTable[idx];
908     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
909   }
910   CkAssert(idx < len);
911   int isChkpting = msg->cp_flag;
912   ckTable[idx]->updateBuffer(msg);
913   if (isChkpting) {
914       // all my array elements have returned their inmem data
915       // inform starter processor that I am done.
916     recvCount ++;
917     if (recvCount == ckTable.length()) {
918       if (where == CkCheckPoint_inMEM) {
919         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
920       }
921       else if (where == CkCheckPoint_inDISK) {
922         // another barrier for finalize the writing using fsync
923         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
924         contribute(0,NULL,CkReduction::sum_int,localcb);
925       }
926       else
927         CmiAbort("Unknown checkpoint scheme");
928       recvCount = 0;
929     } 
930   }
931 }
932
933 // only used in disk checkpointing
934 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
935 {
936   delete m;
937 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
938   system("sync");
939 #endif
940   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
941 }
942
943 // only is called on cpStarter when checkpoint is done
944 void CkMemCheckPT::cpFinish()
945 {
946   CmiAssert(CkMyPe() == cpStarter);
947   peCount++;
948     // now that all processors have finished, activate callback
949   if (peCount == 2) 
950 {
951     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
952     peCount = 0;
953     thisProxy.report();
954   }
955 }
956
957 // for debugging, report checkpoint info
958 void CkMemCheckPT::report()
959 {
960         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
961         inCheckpointing = 0;
962 #if !CMK_CHKP_ALL
963   int objsize = 0;
964   int len = ckTable.length();
965   for (int i=0; i<len; i++) {
966     CkCheckPTInfo *entry = ckTable[i];
967     CmiAssert(entry);
968     objsize += entry->getSize();
969   }
970   CmiAssert(CpvAccess(procChkptBuf));
971   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
972 #else
973   if(CkMyPe()==0)
974   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
975 #endif
976 }
977
978 /*****************************************************************************
979                         RESTART Procedure
980 *****************************************************************************/
981
982 // master processor of two buddies
983 inline int CkMemCheckPT::isMaster(int buddype)
984 {
985 #if 0
986   int mype = CkMyPe();
987 //CkPrintf("ismaster: %d %d\n", pe, mype);
988   if (CkNumPes() - totalFailed() == 2) {
989     return mype > buddype;
990   }
991   for (int i=1; i<CkNumPes(); i++) {
992     int me = (buddype+i)%CkNumPes();
993     if (isFailed(me)) continue;
994     if (me == mype) return 1;
995     else return 0;
996   }
997   return 0;
998 #else
999     // smaller one
1000   int mype = CkMyPe();
1001 //CkPrintf("ismaster: %d %d\n", pe, mype);
1002   if (CkNumPes() - totalFailed() == 2) {
1003     return mype < buddype;
1004   }
1005 #if NODE_CHECKPOINT
1006   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1007   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1008 #else
1009   for (int i=1; i<CkNumPes(); i++) {
1010 #endif
1011     int me = (mype+i)%CkNumPes();
1012     if (isFailed(me)) continue;
1013     if (me == buddype) return 1;
1014     else return 0;
1015   }
1016   return 0;
1017 #endif
1018 }
1019
1020
1021
1022 #if 0
1023 // helper class to pup all elements that belong to same ckLocMgr
1024 class ElementDestoryer : public CkLocIterator {
1025 private:
1026         CkLocMgr *locMgr;
1027 public:
1028         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1029         void addLocation(CkLocation &loc) {
1030                 CkArrayIndex idx=loc.getIndex();
1031                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1032                 loc.destroy();
1033         }
1034 };
1035 #endif
1036
1037 // restore the bitmap vector for LB
1038 void CkMemCheckPT::resetLB(int diepe)
1039 {
1040 #if CMK_LBDB_ON
1041   int i;
1042   char *bitmap = new char[CkNumPes()];
1043   // set processor available bitmap
1044   get_avail_vector(bitmap);
1045
1046   for (i=0; i<failedPes.length(); i++)
1047     bitmap[failedPes[i]] = 0; 
1048   bitmap[diepe] = 0;
1049
1050 #if CK_NO_PROC_POOL
1051   set_avail_vector(bitmap);
1052 #endif
1053
1054   // if I am the crashed pe, rebuild my failedPEs array
1055   if (CkMyNode() == diepe)
1056     for (i=0; i<CkNumPes(); i++) 
1057       if (bitmap[i]==0) failed(i);
1058
1059   delete [] bitmap;
1060 #endif
1061 }
1062
1063 static double restartT;
1064
1065 // in case when failedPe dies, everybody go through its checkpoint table:
1066 // destory all array elements
1067 // recover lost buddies
1068 // reconstruct all array elements from check point data
1069 // called on all processors
1070 void CkMemCheckPT::restart(int diePe)
1071 {
1072 #if CMK_MEM_CHECKPOINT
1073   double curTime = CmiWallTimer();
1074   if (CkMyPe() == diePe){
1075            restartT = CmiWallTimer();
1076     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1077   }
1078   stage = (char*)"resetLB";
1079   startTime = curTime;
1080   if (CkMyPe() == diePe)
1081     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1082
1083 #if CK_NO_PROC_POOL
1084   failed(diePe);        // add into the list of failed pes
1085 #endif
1086   thisFailedPe = diePe;
1087
1088   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1089
1090   inRestarting = 1;
1091                                                                                 
1092   // disable load balancer's barrier
1093   //if (CkMyPe() != diePe) resetLB(diePe);
1094
1095   CKLOCMGR_LOOP(mgr->startInserting(););
1096
1097
1098   if(CmiNumPartition()==1){
1099         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1100   }else{
1101         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1102         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1103   }
1104 #endif
1105 }
1106
1107 // loally remove all array elements
1108 void CkMemCheckPT::removeArrayElements()
1109 {
1110 #if CMK_MEM_CHECKPOINT
1111   int len = ckTable.length();
1112   double curTime = CmiWallTimer();
1113   if (CkMyPe() == thisFailedPe) 
1114     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1115   stage = (char*)"removeArrayElements";
1116   startTime = curTime;
1117
1118 //  if (cpCallback.isInvalid()) 
1119 //        CkPrintf("invalid pe %d\n",CkMyPe());
1120 //        CkAbort("Didn't set restart callback\n");;
1121   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1122
1123   // get rid of all buffering and remote recs
1124   // including destorying all array elements
1125 #if CK_NO_PROC_POOL  
1126         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1127 #else
1128         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1129 #endif
1130   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1131 #endif
1132 }
1133
1134 // flush state in reduction manager
1135 void CkMemCheckPT::resetReductionMgr()
1136 {
1137   if (CkMyPe() == thisFailedPe) 
1138     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1139   int numGroups = CkpvAccess(_groupIDTable)->size();
1140   for(int i=0;i<numGroups;i++) {
1141     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1142     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1143     obj->flushStates();
1144     obj->ckJustMigrated();
1145   }
1146   // reset again
1147   //CpvAccess(_qd)->flushStates();
1148   if(CmiNumPartition()==1){
1149         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1150   }
1151   else
1152         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1153 }
1154
1155 // recover the lost buddies
1156 void CkMemCheckPT::recoverBuddies()
1157 {
1158   int idx;
1159   int len = ckTable.length();
1160   // ready to flush reduction manager
1161   // cannot be CkMemCheckPT::restart because destory will modify states
1162   double curTime = CmiWallTimer();
1163   if (CkMyPe() == thisFailedPe)
1164   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1165   stage = (char *)"recoverBuddies";
1166   if (CkMyPe() == thisFailedPe)
1167   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1168   startTime = curTime;
1169
1170   // recover buddies
1171   expectCount = 0;
1172 #if !CMK_CHKP_ALL
1173   for (idx=0; idx<len; idx++) {
1174     CkCheckPTInfo *entry = ckTable[idx];
1175     if (entry->pNo == thisFailedPe) {
1176 #if CK_NO_PROC_POOL
1177       // find a new buddy
1178       int budPe = BuddyPE(CkMyPe());
1179 #else
1180       int budPe = thisFailedPe;
1181 #endif
1182       CkArrayCheckPTMessage *msg = entry->getCopy();
1183       msg->bud1 = budPe;
1184       msg->bud2 = CkMyPe();
1185       msg->cp_flag = 0;            // not checkpointing
1186       thisProxy[budPe].recoverEntry(msg);
1187       expectCount ++;
1188     }
1189   }
1190 #else
1191   //send to failed pe
1192   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1193 #if CK_NO_PROC_POOL
1194       // find a new buddy
1195       int budPe = BuddyPE(CkMyPe());
1196 #else
1197       int budPe = thisFailedPe;
1198 #endif
1199       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1200       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1201           msg->cp_flag = 0;            // not checkpointing
1202       msg->bud1 = budPe;
1203       msg->bud2 = CkMyPe();
1204       thisProxy[budPe].recoverEntry(msg);
1205       expectCount ++;
1206   }
1207 #endif
1208
1209   if (expectCount == 0) {
1210           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1211   }
1212   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1213 }
1214
1215 void CkMemCheckPT::gotData()
1216 {
1217   ackCount ++;
1218   if (ackCount == expectCount) {
1219     ackCount = 0;
1220     expectCount = -1;
1221     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1222     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1223   }
1224 }
1225
1226 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1227 {
1228
1229   for (int i=0; i<n; i++) {
1230     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1231     mgr->updateLocation(idx[i], nowOnPe);
1232   }
1233         thisProxy[nowOnPe].gotReply();
1234 }
1235
1236 // restore array elements
1237 void CkMemCheckPT::recoverArrayElements()
1238 {
1239   double curTime = CmiWallTimer();
1240   int len = ckTable.length();
1241   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1242   stage = (char *)"recoverArrayElements";
1243   if (CkMyPe() == thisFailedPe)
1244   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1245   startTime = curTime;
1246  int flag = 0;
1247   // recover all array elements
1248   int count = 0;
1249
1250 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1251   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1252   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1253 #endif
1254
1255 #if !CMK_CHKP_ALL
1256   for (int idx=0; idx<len; idx++)
1257   {
1258     CkCheckPTInfo *entry = ckTable[idx];
1259 #if CK_NO_PROC_POOL
1260     // the bigger one will do 
1261 //    if (CkMyPe() < entry->pNo) continue;
1262     if (!isMaster(entry->pNo)) continue;
1263 #else
1264     // smaller one do it, which has the original object
1265     if (CkMyPe() == entry->pNo+1 || 
1266         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1267 #endif
1268 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1269
1270     entry->updateBuddy(CkMyPe(), entry->pNo);
1271     CkArrayCheckPTMessage *msg = entry->getCopy();
1272     // gzheng
1273     //thisProxy[CkMyPe()].inmem_restore(msg);
1274     inmem_restore(msg);
1275 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1276     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1277     int homePe = mgr->homePe(msg->index);
1278     if (homePe != CkMyPe()) {
1279       gmap[homePe].push_back(msg->locMgr);
1280       imap[homePe].push_back(msg->index);
1281     }
1282 #endif
1283     CkFreeMsg(msg);
1284     count ++;
1285   }
1286 #else
1287         char * packData;
1288         if(CmiNumPartition()==1){
1289                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1290                 packData = (char *)msg->packData;
1291         }
1292         else{
1293                 int pointer = CpvAccess(curPointer);
1294                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1295                 packData = msg->packData;
1296         }
1297 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1298         recoverAll(packData,gmap,imap);
1299 #else
1300         recoverAll(packData);
1301 #endif
1302 #endif
1303   curTime = CmiWallTimer();
1304   //if (CkMyPe() == thisFailedPe)
1305   if (CkMyPe() == 0)
1306         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1307 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1308   for (int i=0; i<CkNumPes(); i++) {
1309     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1310       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1311         flag++; 
1312         }
1313   }
1314   delete [] imap;
1315   delete [] gmap;
1316 #endif
1317   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1318
1319   CKLOCMGR_LOOP(mgr->doneInserting(););
1320
1321   // _crashedNode = -1;
1322   CpvAccess(_crashedNode) = -1;
1323   inRestarting = 0;
1324 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1325   if (CkMyPe() == 0)
1326     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1327 #else
1328 if(flag == 0)
1329 {
1330     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1331 }
1332 #endif
1333 }
1334
1335 void CkMemCheckPT::gotReply(){
1336     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1337 }
1338
1339 void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1340 #if CMK_CHKP_ALL
1341         PUP::fromMem p(packData);
1342         int numElements;
1343         p|numElements;
1344         if(p.isUnpacking()){
1345                 for(int i=0;i<numElements;i++){
1346                         CkGroupID gID;
1347                         CkArrayIndex idx;
1348                         p|gID;
1349                         p|idx;
1350                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1351                         int homePe = mgr->homePe(idx);
1352 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1353                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1354 #else
1355                         if(CmiNumPartition()==1)
1356                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1357                         else{
1358                                 if(CkMyPe()==thisFailedPe){
1359                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1360                                 }
1361                         else{
1362                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1363                                 }
1364                         }
1365 #endif
1366 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1367                         homePe = mgr->homePe(idx);
1368                         if (homePe != CkMyPe()) {
1369                           gmap[homePe].push_back(gID);
1370                           imap[homePe].push_back(idx);
1371                         }
1372 #endif
1373                 }
1374         }
1375 #endif
1376 }
1377
1378
1379 // on every processor
1380 // turn load balancer back on
1381 void CkMemCheckPT::finishUp()
1382 {
1383   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1384   //CKLOCMGR_LOOP(mgr->doneInserting(););
1385   
1386   if (CkMyPe() == thisFailedPe)
1387   {
1388        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1389        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1390   }
1391   CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1392         
1393 #if CMK_CONVERSE_MPI    
1394   if(CmiNumPartition()!=1){
1395         CpvAccess(recvdProcChkp) = 0;
1396         CpvAccess(recvdArrayChkp) = 0;
1397         CpvAccess(curPointer)^=1;
1398         //notify my replica, restart is done
1399    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1400    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1401    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1402   }
1403    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1404          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1405    }
1406 #endif
1407
1408 #if CK_NO_PROC_POOL
1409 #if NODE_CHECKPOINT
1410   int numnodes = CmiNumPhysicalNodes();
1411 #else
1412   int numnodes = CkNumPes();
1413 #endif
1414   if (numnodes-totalFailed() <=2) {
1415     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1416     _memChkptOn = 0;
1417   }
1418 #endif
1419 }
1420
1421 void CkMemCheckPT::recoverFromSoftFailure()
1422 {
1423         inRestarting = 0;
1424         CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1425 }
1426 // called only on 0
1427 void CkMemCheckPT::quiescence(CkCallback &cb)
1428 {
1429   static int pe_count = 0;
1430   pe_count ++;
1431   CmiAssert(CkMyPe() == 0);
1432   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1433   if (pe_count == CkNumPes()) {
1434     pe_count = 0;
1435     cb.send();
1436   }
1437 }
1438
1439 // User callable function - to start a checkpoint
1440 // callback cb is used to pass control back
1441 void CkStartMemCheckpoint(CkCallback &cb)
1442 {
1443 #if CMK_MEM_CHECKPOINT
1444         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1445   if (_memChkptOn == 0) {
1446     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1447     cb.send();
1448     return;
1449   }
1450   if (CkInRestarting()) {
1451       // trying to checkpointing during restart
1452     cb.send();
1453     return;
1454   }
1455     // store user callback and user data
1456   CkMemCheckPT::cpCallback = cb;
1457
1458     // broadcast to start check pointing
1459   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1460   checkptMgr.doItNow(CkMyPe(), cb);
1461 #else
1462   // when mem checkpoint is disabled, invike cb immediately
1463   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1464   cb.send();
1465 #endif
1466 }
1467
1468 void CkRestartCheckPoint(int diePe)
1469 {
1470 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1471   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1472   // broadcast
1473   checkptMgr.restart(diePe);
1474 }
1475
1476 static int _diePE = -1;
1477
1478 // callback function used locally by ccs handler
1479 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1480 {
1481 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1482   CkRestartCheckPoint(_diePE);
1483 }
1484
1485
1486 // called on crashed PE
1487 static void restartBeginHandler(char *msg)
1488 {
1489   CmiFree(msg);
1490 #if CMK_MEM_CHECKPOINT
1491 #if CMK_USE_BARRIER
1492         if(CkMyPe()!=_diePE){
1493                 printf("restar begin on %d\n",CkMyPe());
1494                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1495                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1496                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1497         }else{
1498         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1499         CkRestartCheckPointCallback(NULL, NULL);
1500         }
1501 #else
1502   static int count = 0;
1503   CmiAssert(CkMyPe() == _diePE);
1504   count ++;
1505   if (count == CkNumPes()) {
1506           printf("restart begin on %d\n",CkMyPe());
1507     CkRestartCheckPointCallback(NULL, NULL);
1508     count = 0;
1509   }
1510 #endif
1511 #endif
1512 }
1513
1514 extern void _discard_charm_message();
1515 extern void _resume_charm_message();
1516
1517 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1518         return data;
1519 }
1520
1521 static void restartBcastHandler(char *msg)
1522 {
1523 #if CMK_MEM_CHECKPOINT
1524   // advance phase counter
1525   CkMemCheckPT::inRestarting = 1;
1526   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1527   // gzheng
1528   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1529
1530   if (CkMyPe()==_diePE)
1531     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1532
1533   // reset QD counters
1534 /*  gzheng
1535   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1536 */
1537
1538 /*  gzheng
1539   if (CkMyPe()==_diePE)
1540       CkRestartCheckPointCallback(NULL, NULL);
1541 */
1542   CmiFree(msg);
1543
1544   _resume_charm_message();
1545
1546     // reduction
1547   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1548   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1549 #if CMK_USE_BARRIER
1550         //CmiPrintf("before reduce\n"); 
1551         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1552         //CmiPrintf("after reduce\n");  
1553 #else
1554   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1555 #endif 
1556  checkpointed = 0;
1557 #endif
1558 }
1559
1560 extern void _initDone();
1561
1562 bool compare(char * buf1, char *buf2){
1563         //buf1 my copy, buf2 from another one 
1564 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1565         PUP::checker pchecker(buf1,buf2);
1566         pchecker.skip();
1567         
1568         int numElements;
1569         pchecker|numElements;
1570 //      CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1571         for(int i=0;i<numElements;i++){
1572         //for(int i=0;i<1;i++){
1573                 CkGroupID gID;
1574                 CkArrayIndex idx;
1575                 
1576                 pchecker|gID;
1577                 pchecker|idx;
1578                 
1579 //              CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1580                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1581                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1582 //              CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1583         }
1584         return pchecker.getResult();
1585 }
1586
1587 static void recvRemoteChkpHandler(char *msg){
1588    envelope *env = (envelope *)msg;
1589    CkUnpackMessage(&env);
1590    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1591   if(CpvAccess(recvdLocal)==1){
1592           int pointer = CpvAccess(curPointer);
1593           int size = CpvAccess(chkpBuf)[pointer]->len;
1594           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1595           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1596                         checkptMgr[CkMyPe()].doneComparison(true);
1597           }else
1598           {
1599                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1600                         checkptMgr[CkMyPe()].doneComparison(false);
1601           }
1602   }else{
1603           CpvAccess(recvdRemote) = 1;
1604           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1605           CpvAccess(buddyBuf) = chkpMsg;
1606   }  
1607 }
1608
1609 static void replicaRecoverHandler(char *msg){
1610         CpvAccess(_remoteCrashedNode) = -1;
1611         CkMemCheckPT::replicaAlive = 1;
1612     bool ret = true;
1613     CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1614 }
1615
1616 static void replicaDieHandler(char * msg){
1617 #if CMK_CONVERSE_MPI    
1618         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1619         CpvAccess(_remoteCrashedNode) = diePe;
1620         CkMemCheckPT::replicaAlive = 0;
1621         find_spare_mpirank(diePe,CmiMyPartition()^1);
1622     if(CkMyPe()==diePe){
1623       CkPrintf("pe %d in replicad word die\n",diePe);
1624             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1625     }
1626 #endif
1627         //broadcast to my partition to get local max iter
1628     if(CkMyPe()==diePe){
1629                 CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1630                 checkptMgr.getIter();
1631         }
1632 }
1633
1634
1635 static void replicaDieBcastHandler(char *msg){
1636         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1637         CpvAccess(_remoteCrashedNode) = diePe;
1638         CkMemCheckPT::replicaAlive = 0;
1639 }
1640
1641 static void recoverRemoteProcDataHandler(char *msg){
1642    envelope *env = (envelope *)msg;
1643    CkUnpackMessage(&env);
1644    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1645         
1646    //store the checkpoint
1647         int pointer = procMsg->pointer;
1648
1649
1650    if(CkMyPe()==CpvAccess(_crashedNode)){
1651            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1652            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1653            PUP::fromMem p(procMsg->packData);
1654            _handleProcData(p,CmiTrue);
1655            _initDone();
1656    }
1657    else{
1658            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1659            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1660            //_handleProcData(p,CmiFalse);
1661    }
1662    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1663    CKLOCMGR_LOOP(mgr->startInserting(););
1664    
1665    CpvAccess(recvdProcChkp) =1;
1666         if(CpvAccess(recvdArrayChkp)==1){
1667                 _resume_charm_message();
1668                 _diePE = CpvAccess(_crashedNode);
1669                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1670                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1671                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1672         }
1673 }
1674
1675 static void recoverRemoteArrayDataHandler(char *msg){
1676    envelope *env = (envelope *)msg;
1677    CkUnpackMessage(&env);
1678    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1679         
1680    //store the checkpoint
1681         int pointer = chkpMsg->pointer;
1682         CpvAccess(curPointer) = pointer;
1683         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1684         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1685    CpvAccess(recvdArrayChkp) =1;
1686         CkMemCheckPT::inRestarting = 1;
1687         if(CpvAccess(recvdProcChkp) == 1){
1688                 _resume_charm_message();
1689                 _diePE = CpvAccess(_crashedNode);
1690                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1691                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1692                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1693                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1694         }
1695 }
1696
1697 static void recvPhaseHandler(char * msg)
1698 {
1699         CpvAccess(_curRestartPhase)--;
1700         CkMemCheckPT::inRestarting = 1;
1701         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1702   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1703   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1704   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1705 //              CmiPrintf("start ping check handler\n");
1706 //       }
1707         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1708   // }
1709    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1710    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1711
1712 }
1713 // called on crashed processor
1714 static void recoverProcDataHandler(char *msg)
1715 {
1716 #if CMK_MEM_CHECKPOINT
1717    int i;
1718    envelope *env = (envelope *)msg;
1719    CkUnpackMessage(&env);
1720    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1721    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1722    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1723    PUP::fromMem p(procMsg->packData);
1724    _handleProcData(p,CmiTrue);
1725
1726    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1727    // gzheng
1728    CKLOCMGR_LOOP(mgr->startInserting(););
1729
1730    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1731    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1732    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1733    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1734
1735    _initDone();
1736 //   CpvAccess(_qd)->flushStates();
1737    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1738 #endif
1739 }
1740 //for replica, got the phase number from my neighbor processor in the current partition
1741 static void askPhaseHandler(char *msg)
1742 {
1743 #if CMK_MEM_CHECKPOINT
1744         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1745     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1746         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1747     CmiSetHandler(msg, recvPhaseHandlerIdx);
1748         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1749 #endif
1750 }
1751 // called on its backup processor
1752 // get backup message buffer and sent to crashed processor
1753 static void askProcDataHandler(char *msg)
1754 {
1755 #if CMK_MEM_CHECKPOINT
1756     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1757     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1758     if (CpvAccess(procChkptBuf) == NULL)  {
1759       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1760       CkAbort("no checkpoint found");
1761     }
1762     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1763     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1764
1765     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1766
1767     CkPackMessage(&env);
1768     CmiSetHandler(env, recoverProcDataHandlerIdx);
1769     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1770     CpvAccess(procChkptBuf) = NULL;
1771     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1772 #endif
1773 }
1774
1775 // called on PE 0
1776 void qd_callback(void *m)
1777 {
1778    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1779    CkFreeMsg(m);
1780    if(CmiNumPartition()==1){
1781 #ifdef CMK_SMP
1782            for(int i=0;i<CmiMyNodeSize();i++){
1783            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1784            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1785                 CmiSetHandler(msg, askProcDataHandlerIdx);
1786                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1787                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1788            }
1789            return;
1790 #endif
1791            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1792            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1793            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1794            CmiSetHandler(msg, askProcDataHandlerIdx);
1795            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1796            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1797         }
1798    else{
1799                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1800                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1801                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1802    }
1803 }
1804
1805 // on crashed node
1806 void CkMemRestart(const char *dummy, CkArgMsg *args)
1807 {
1808 #if CMK_MEM_CHECKPOINT
1809    _diePE = CmiMyNode();
1810    CpvAccess( _crashedNode )= CmiMyNode();
1811    CkMemCheckPT::inRestarting = 1;
1812    _discard_charm_message();
1813   
1814   /*if(CmiMyRank()==0){
1815     CkCallback cb(qd_callback);
1816     CkStartQD(cb);
1817     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1818   }*/
1819    if(CmiNumPartition()==1){
1820            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1821                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1822                 restartT = CmiWallTimer();
1823            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1824            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1825            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1826            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1827            CmiSetHandler(msg, askProcDataHandlerIdx);
1828            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1829            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1830    }
1831    else{
1832                 CkCallback cb(qd_callback);
1833                 CkStartQD(cb);
1834    }
1835 #else
1836    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1837 #endif
1838 }
1839
1840 // can be called in other files
1841 // return true if it is in restarting
1842 extern "C"
1843 int CkInRestarting()
1844 {
1845 #if CMK_MEM_CHECKPOINT
1846   if (CpvAccess( _crashedNode)!=-1) return 1;
1847   // gzheng
1848   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1849   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1850   return CkMemCheckPT::inRestarting;
1851 #else
1852   return 0;
1853 #endif
1854 }
1855
1856 extern "C"
1857 int CkReplicaAlive()
1858 {
1859 //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1860 //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1861         return CkMemCheckPT::replicaAlive;
1862
1863         /*if(CkMemCheckPT::replicaDead==1)
1864                 return 0;
1865         else            
1866                 return 1;*/
1867 }
1868
1869 extern "C"
1870 int CkInCheckpointing()
1871 {
1872         return CkMemCheckPT::inCheckpointing;
1873 }
1874
1875 extern "C"
1876 void CkSetInLdb(){
1877 #if CMK_MEM_CHECKPOINT
1878         CkMemCheckPT::inLoadbalancing = 1;
1879 #endif
1880 }
1881
1882 extern "C"
1883 int CkInLdb(){
1884 #if CMK_MEM_CHECKPOINT
1885         return CkMemCheckPT::inLoadbalancing;
1886 #endif
1887         return 0;
1888 }
1889
1890 extern "C"
1891 void CkResetInLdb(){
1892 #if CMK_MEM_CHECKPOINT
1893         CkMemCheckPT::inLoadbalancing = 0;
1894 #endif
1895 }
1896
1897 /*****************************************************************************
1898                 module initialization
1899 *****************************************************************************/
1900
1901 static int arg_where = CkCheckPoint_inMEM;
1902
1903 #if CMK_MEM_CHECKPOINT
1904 void init_memcheckpt(char **argv)
1905 {
1906     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1907       arg_where = CkCheckPoint_inDISK;
1908     }
1909         // initiliazing _crashedNode variable
1910         CpvInitialize(int, _crashedNode);
1911         CpvInitialize(int, _remoteCrashedNode);
1912         CpvAccess(_crashedNode) = -1;
1913         CpvAccess(_remoteCrashedNode) = -1;
1914         init_FI(argv);
1915
1916 }
1917 #endif
1918
1919 class CkMemCheckPTInit: public Chare {
1920 public:
1921   CkMemCheckPTInit(CkArgMsg *m) {
1922 #if CMK_MEM_CHECKPOINT
1923     if (arg_where == CkCheckPoint_inDISK) {
1924       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1925     }
1926     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1927     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1928 #endif
1929   }
1930 };
1931
1932 static void notifyHandler(char *msg)
1933 {
1934 #if CMK_MEM_CHECKPOINT
1935   CmiFree(msg);
1936       /* immediately increase restart phase to filter old messages */
1937   CpvAccess(_curRestartPhase) ++;
1938   CpvAccess(_qd)->flushStates();
1939   _discard_charm_message();
1940
1941 #endif
1942 }
1943
1944 extern "C"
1945 void notify_crash(int node)
1946 {
1947 #ifdef CMK_MEM_CHECKPOINT
1948   CpvAccess( _crashedNode) = node;
1949 #ifdef CMK_SMP
1950   for(int i=0;i<CkMyNodeSize();i++){
1951         CpvAccessOther(_crashedNode,i)=node;
1952   }
1953 #endif
1954   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1955   CkMemCheckPT::inRestarting = 1;
1956
1957     // this may be in interrupt handler, send a message to reset QD
1958   int pe = CmiNodeFirst(CkMyNode());
1959   for(int i=0;i<CkMyNodeSize();i++){
1960         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1961         CmiSetHandler(msg, notifyHandlerIdx);
1962         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1963   }
1964 #endif
1965 }
1966
1967 extern "C" void (*notify_crash_fn)(int node);
1968
1969
1970 #if CMK_CONVERSE_MPI
1971 void buddyDieHandler(char *msg)
1972 {
1973 #if CMK_MEM_CHECKPOINT
1974    // notify
1975         CkMemCheckPT::inRestarting = 1;
1976    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1977    notify_crash(diepe);
1978    // send message to crash pe to let it restart
1979    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1980    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1981    int buddy = obj->BuddyPE(CmiMyPe());
1982    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1983    if (buddy == diepe)  {
1984      mpi_restart_crashed(diepe, newrank);
1985    }
1986 #endif
1987 }
1988
1989 void pingHandler(void *msg)
1990 {
1991   lastPingTime = CmiWallTimer();
1992   CmiFree(msg);
1993 }
1994
1995 void pingCheckHandler()
1996 {
1997 #if CMK_MEM_CHECKPOINT
1998   double now = CmiWallTimer();
1999   if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2000   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2001     int i, pe, buddy;
2002     // tell everyone the buddy dies
2003     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2004     for (i = 1; i < CmiNumPes(); i++) {
2005        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2006        if (obj->BuddyPE(pe) == CmiMyPe()) break;
2007     }
2008     buddy = pe;
2009     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2010     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
2011       if (obj->isFailed(pe) || pe == buddy) continue;
2012       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2013       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2014       CmiSetHandler(msg, buddyDieHandlerIdx);
2015       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2016     }*/
2017     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2018     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2019     CmiSetHandler(msg, buddyDieHandlerIdx);
2020     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2021     //send to everyone in the other world
2022         if(CmiNumPartition()!=1){
2023                 for(int i=0;i<CmiNumPes();i++){
2024                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2025                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2026                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
2027                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2028                 }
2029         }
2030   }
2031   else 
2032     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2033 #endif
2034 }
2035
2036 void pingBuddy()
2037 {
2038 #if CMK_MEM_CHECKPOINT
2039   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2040   if (obj) {
2041     int buddy = obj->BuddyPE(CkMyPe());
2042     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2043     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2044     CmiSetHandler(msg, pingHandlerIdx);
2045     CmiGetRestartPhase(msg) = 9999;
2046     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2047   }
2048   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2049 #endif
2050 }
2051 #endif
2052
2053 // initproc
2054 void CkRegisterRestartHandler( )
2055 {
2056 #if CMK_MEM_CHECKPOINT
2057   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2058   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2059   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2060   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2061   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2062   
2063   //for replica
2064   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2065   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2066   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2067   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2068   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2069   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2070   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2071   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2072
2073 #if CMK_CONVERSE_MPI
2074   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2075   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2076   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2077 #endif
2078
2079   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2080   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2081   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2082   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2083   CpvInitialize(int,curPointer);
2084   CpvInitialize(int,recvdLocal);
2085   CpvInitialize(int,recvdRemote);
2086   CpvInitialize(int,recvdProcChkp);
2087   CpvInitialize(int,recvdArrayChkp);
2088
2089   CpvAccess(procChkptBuf) = NULL;
2090   CpvAccess(buddyBuf) = NULL;
2091   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2092   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2093   CpvAccess(chkpBuf)[0] = NULL;
2094   CpvAccess(chkpBuf)[1] = NULL;
2095   CpvAccess(localProcChkpBuf)[0] = NULL;
2096   CpvAccess(localProcChkpBuf)[1] = NULL;
2097   
2098   CpvAccess(curPointer) = 0;
2099   CpvAccess(recvdLocal) = 0;
2100   CpvAccess(recvdRemote) = 0;
2101   CpvAccess(recvdProcChkp) = 0;
2102   CpvAccess(recvdArrayChkp) = 0;
2103
2104   notify_crash_fn = notify_crash;
2105
2106 #if ! CMK_CONVERSE_MPI
2107   // print pid to kill
2108 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2109 //  sleep(4);
2110 #endif
2111 #endif
2112 }
2113
2114
2115 extern "C"
2116 int CkHasCheckpoints()
2117 {
2118   return checkpointed;
2119 }
2120
2121 /// @todo: the following definitions should be moved to a separate file containing
2122 // structures and functions about fault tolerance strategies
2123
2124 /**
2125  *  * @brief: function for killing a process                                             
2126  *   */
2127 #ifdef CMK_MEM_CHECKPOINT
2128 #if CMK_HAS_GETPID
2129 void killLocal(void *_dummy,double curWallTime){
2130         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2131         if(CmiWallTimer()<killTime-1){
2132                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2133         }else{ 
2134 #if CMK_CONVERSE_MPI
2135                                 CkDieNow();
2136 #else 
2137                 kill(getpid(),SIGKILL);                                               
2138 #endif
2139         }              
2140
2141 #else
2142 void killLocal(void *_dummy,double curWallTime){
2143   CmiAbort("kill() not supported!");
2144 }
2145 #endif
2146 #endif
2147
2148 #ifdef CMK_MEM_CHECKPOINT
2149 /**
2150  * @brief: reads the file with the kill information
2151  */
2152 void readKillFile(){
2153         FILE *fp=fopen(killFile,"r");
2154         if(!fp){
2155                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2156                 return;
2157         }
2158         int proc;
2159         double sec;
2160         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2161                 if(proc == CkMyNode() && CkMyRank() == 0){
2162                         killTime = CmiWallTimer()+sec;
2163                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2164                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2165                 }
2166         }
2167         fclose(fp);
2168 }
2169
2170 #if ! CMK_CONVERSE_MPI
2171 void CkDieNow()
2172 {
2173 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2174          // ignored for non-mpi version
2175         CmiPrintf("[%d] die now.\n", CmiMyPe());
2176         killTime = CmiWallTimer()+0.001;
2177         CcdCallFnAfter(killLocal,NULL,1);
2178 #endif
2179 }
2180 #endif
2181
2182 #endif
2183
2184 #include "CkMemCheckpoint.def.h"
2185
2186