fix a bug
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117 }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, localChkpDone);
131 CpvDeclare(int, remoteChkpDone);
132 CpvDeclare(int, recvdArrayChkp);
133 CpvDeclare(int, recvdProcChkp);
134
135 bool compare(char * buf1, char * buf2);
136 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
137 // Converse function handles
138 static int askPhaseHandlerIdx;
139 static int recvPhaseHandlerIdx;
140 static int askProcDataHandlerIdx;
141 static int restartBcastHandlerIdx;
142 static int recoverProcDataHandlerIdx;
143 static int restartBeginHandlerIdx;
144 static int recvRemoteChkpHandlerIdx;
145 static int replicaDieHandlerIdx;
146 static int replicaDieBcastHandlerIdx;
147 static int replicaRecoverHandlerIdx;
148 static int replicaChkpDoneHandlerIdx;
149 static int recoverRemoteProcDataHandlerIdx;
150 static int recoverRemoteArrayDataHandlerIdx;
151 static int notifyHandlerIdx;
152 // compute the backup processor
153 // FIXME: avoid crashed processors
154 #if CMK_CONVERSE_MPI
155 static int pingHandlerIdx;
156 static int pingCheckHandlerIdx;
157 static int buddyDieHandlerIdx;
158 static double lastPingTime = -1;
159
160 extern "C" void mpi_restart_crashed(int pe, int rank);
161 extern "C" int  find_spare_mpirank(int pe,int partition);
162
163 void pingBuddy();
164 void pingCheckHandler();
165
166 #endif
167 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
168
169 inline int CkMemCheckPT::BuddyPE(int pe)
170 {
171   int budpe;
172 #if NODE_CHECKPOINT
173   // buddy is the processor with same rank on the next physical node
174   int r1 = CmiPhysicalRank(pe);
175   int budnode = CmiPhysicalNodeID(pe);
176   do {
177     budnode = (budnode+1)%CmiNumPhysicalNodes();
178     int *pelist;
179     int num;
180     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
181     budpe = pelist[r1 % num];
182   } while (isFailed(budpe));
183   if (budpe == pe) {
184     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
185     CmiAbort("Failed to find a buddy processor");
186   }
187 #else
188   budpe = pe;
189   budpe = (budpe+1)%CkNumPes();
190 #endif
191   return budpe;
192 }
193
194 // called in array element constructor
195 // choose and register with 2 buddies for checkpoiting 
196 #if CMK_MEM_CHECKPOINT
197 void ArrayElement::init_checkpt() {
198   if (_memChkptOn == 0) return;
199   if (CkInRestarting()) {
200     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
201   }
202   // only master init checkpoint
203   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
204
205   budPEs[0] = CkMyPe();
206   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
207   CmiAssert(budPEs[0] != budPEs[1]);
208   // inform checkPTMgr
209   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
210   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
211   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
212   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
213 }
214 #endif
215
216 // entry function invoked by checkpoint mgr asking for checkpoint data
217 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
218 #if CMK_MEM_CHECKPOINT
219   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
220   //char index[128];   thisIndexMax.sprint(index);
221   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
222   CkLocMgr *locMgr = thisArray->getLocMgr();
223   CmiAssert(myRec!=NULL);
224   int size;
225   {
226     PUP::sizer p;
227     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
228     size = p.size();
229   }
230   int packSize = size/sizeof(double) +1;
231   CkArrayCheckPTMessage *msg =
232     new (packSize, 0) CkArrayCheckPTMessage;
233   msg->len = size;
234   msg->index =thisIndexMax;
235   msg->aid = thisArrayID;
236   msg->locMgr = locMgr->getGroupID();
237   msg->cp_flag = 1;
238   {
239     PUP::toMem p(msg->packData);
240     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
241   }
242
243   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
244   checkptMgr.recvData(msg, 2, budPEs);
245   delete m;
246 #endif
247 }
248
249 // checkpoint holder class - for memory checkpointing
250 class CkMemCheckPTInfo: public CkCheckPTInfo
251 {
252   CkArrayCheckPTMessage *ckBuffer;
253   public:
254   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
255     CkCheckPTInfo(a, loc, idx, pno)
256   {
257     ckBuffer = NULL;
258   }
259   ~CkMemCheckPTInfo() 
260   {
261     if (ckBuffer) delete ckBuffer; 
262   }
263   inline void updateBuffer(CkArrayCheckPTMessage *data) 
264   {
265     CmiAssert(data!=NULL);
266     if (ckBuffer) delete ckBuffer;
267     ckBuffer = data;
268   }    
269   inline CkArrayCheckPTMessage * getCopy()
270   {
271     if (ckBuffer == NULL) {
272       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
273       CmiAbort("Abort!");
274     }
275     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
276   }     
277   inline void updateBuddy(int b1, int b2) {
278     CmiAssert(ckBuffer);
279     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
280     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
281     CmiAssert(pNo != CkMyPe());
282   }
283   inline int getSize() { 
284     CmiAssert(ckBuffer);
285     return ckBuffer->len; 
286   }
287 };
288
289 // checkpoint holder class - for in-disk checkpointing
290 class CkDiskCheckPTInfo: public CkCheckPTInfo 
291 {
292   char *fname;
293   int bud1, bud2;
294   int len;                      // checkpoint size
295   public:
296   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
297   {
298 #if CMK_USE_MKSTEMP
299     fname = new char[64];
300     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
301     mkstemp(fname);
302 #else
303     fname=tmpnam(NULL);
304 #endif
305     bud1 = bud2 = -1;
306     len = 0;
307   }
308   ~CkDiskCheckPTInfo() 
309   {
310     remove(fname);
311   }
312   inline void updateBuffer(CkArrayCheckPTMessage *data) 
313   {
314     double t = CmiWallTimer();
315     // unpack it
316     envelope *env = UsrToEnv(data);
317     CkUnpackMessage(&env);
318     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
319     FILE *f = fopen(fname,"wb");
320     PUP::toDisk p(f);
321     CkPupMessage(p, (void **)&data);
322     // delay sync to the end because otherwise the messages are blocked
323     //    fsync(fileno(f));
324     fclose(f);
325     bud1 = data->bud1;
326     bud2 = data->bud2;
327     len = data->len;
328     delete data;
329     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
330   }
331   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
332   {
333     CkArrayCheckPTMessage *data;
334     FILE *f = fopen(fname,"rb");
335     PUP::fromDisk p(f);
336     CkPupMessage(p, (void **)&data);
337     fclose(f);
338     data->bud1 = bud1;                          // update the buddies
339     data->bud2 = bud2;
340     return data;
341   }
342   inline void updateBuddy(int b1, int b2) {
343     bud1 = b1; bud2 = b2;
344     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
345     CmiAssert(pNo != CkMyPe());
346   }
347   inline int getSize() { 
348     return len; 
349   }
350 };
351
352 CkMemCheckPT::CkMemCheckPT(int w)
353 {
354   int numnodes = 0;
355 #if NODE_CHECKPOINT
356   numnodes = CmiNumPhysicalNodes();
357 #else
358   numnodes = CkNumPes();
359 #endif
360 #if CK_NO_PROC_POOL
361   if (numnodes <= 2)
362 #else
363     if (numnodes  == 1)
364 #endif
365     {
366       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
367       _memChkptOn = 0;
368     }
369   inRestarting = 0;
370   inCheckpointing = 0;
371   recvCount = peCount = 0;
372   ackCount = 0;
373   expectCount = -1;
374   where = w;
375   replicaAlive = 1;
376   notifyReplica = 0;
377 #if CMK_CONVERSE_MPI
378   void pingBuddy();
379   void pingCheckHandler();
380   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
381   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
382 #endif
383   chkpTable[0] = NULL;
384   chkpTable[1] = NULL;
385   maxIter = -1;
386   recvIterCount = 0;
387   localDecided = false;
388 }
389
390 CkMemCheckPT::~CkMemCheckPT()
391 {
392   int len = ckTable.length();
393   for (int i=0; i<len; i++) {
394     delete ckTable[i];
395   }
396 }
397
398 void CkMemCheckPT::pup(PUP::er& p) 
399
400   CBase_CkMemCheckPT::pup(p); 
401   p|cpStarter;
402   p|thisFailedPe;
403   p|failedPes;
404   p|ckCheckPTGroupID;           // recover global variable
405   p|cpCallback;                 // store callback
406   p|where;                      // where to checkpoint
407   p|peCount;
408   if (p.isUnpacking()) {
409     recvCount = 0;
410 #if CMK_CONVERSE_MPI
411     void pingBuddy();
412     void pingCheckHandler();
413     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
414     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
415 #endif
416     maxIter = -1;
417     recvIterCount = 0;
418     localDecided = false;
419   }
420 }
421
422 void CkMemCheckPT::getIter(){
423   localDecided = true;
424   localMaxIter = maxIter+1;
425   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
426   int elemCount = CkCountChkpSyncElements();
427   if(elemCount == 0){
428     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
429   }
430 }
431
432 //when one replica failes, decide the next chkp iter
433
434 void CkMemCheckPT::recvIter(int iter){
435   if(maxIter<iter){
436     maxIter=iter;
437   }
438 }
439
440 void CkMemCheckPT::recvMaxIter(int iter){
441   localDecided = false;
442   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
443 }
444
445 void CkMemCheckPT::reachChkpIter(){
446   recvIterCount++;
447   elemCount = CkCountChkpSyncElements();
448   if(recvIterCount == elemCount){
449     recvIterCount = 0;
450     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
451   }
452 }
453
454 void CkMemCheckPT::startChkp(){
455   CkPrintf("start checkpoint\n");
456   CkStartMemCheckpoint(cpCallback);
457 }
458
459 // called by checkpoint mgr to restore an array element
460 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
461 {
462 #if CMK_MEM_CHECKPOINT
463   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
464   // m->index.print();
465   PUP::fromMem p(m->packData);
466   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
467   CmiAssert(mgr);
468 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
469   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
470 #else
471   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
472 #endif
473
474   // find a list of array elements bound together
475   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
476   CmiAssert(elt);
477   CkLocRec_local *rec = elt->myRec;
478   CkVec<CkMigratable *> list;
479   mgr->migratableList(rec, list);
480   CmiAssert(list.length() > 0);
481   for (int l=0; l<list.length(); l++) {
482     elt = (ArrayElement *)list[l];
483     elt->budPEs[0] = m->bud1;
484     elt->budPEs[1] = m->bud2;
485     //    reset, may not needed now
486     // for now.
487     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
488       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
489       if (c) c->redNo = 0;
490     }
491   }
492 #endif
493 }
494
495 // return 1 if pe is a crashed processor
496 int CkMemCheckPT::isFailed(int pe)
497 {
498   for (int i=0; i<failedPes.length(); i++)
499     if (failedPes[i] == pe) return 1;
500   return 0;
501 }
502
503 // add pe into history list of all failed processors
504 void CkMemCheckPT::failed(int pe)
505 {
506   if (isFailed(pe)) return;
507   failedPes.push_back(pe);
508 }
509
510 int CkMemCheckPT::totalFailed()
511 {
512   return failedPes.length();
513 }
514
515 // create an checkpoint entry for array element of aid with index.
516 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
517 {
518   // error check, no duplicate
519   int idx, len = ckTable.size();
520   for (idx=0; idx<len; idx++) {
521     CkCheckPTInfo *entry = ckTable[idx];
522     if (index == entry->index) {
523       if (loc == entry->locMgr) {
524         // bindTo array elements
525         return;
526       }
527       // for array inheritance, the following check may fail
528       // because ArrayElement constructor of all superclasses are called
529       if (aid == entry->aid) {
530         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
531         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
532       }
533     }
534   }
535   CkCheckPTInfo *newEntry;
536   if (where == CkCheckPoint_inMEM)
537     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
538   else
539     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
540   ckTable.push_back(newEntry);
541   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
542 }
543
544 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
545 {
546 #if !CMK_CHKP_ALL       
547   int buddy = msg->bud1;
548   if (buddy == CkMyPe()) buddy = msg->bud2;
549   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
550   recvData(msg);
551   // ack
552   thisProxy[buddy].gotData();
553 #else
554   chkpTable[0] = NULL;
555   chkpTable[1] = NULL;
556   recvArrayCheckpoint(msg);
557   thisProxy[msg->bud2].gotData();
558 #endif
559 }
560
561 // loop through my checkpoint table and ask checkpointed array elements
562 // to send me checkpoint data.
563 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
564 {
565   checkpointed = 1;
566   cpCallback = cb;
567   cpStarter = starter;
568   inCheckpointing = 1;
569   if (CkMyPe() == cpStarter) {
570     startTime = CmiWallTimer();
571     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
572   }
573 #if CMK_CONVERSE_MPI
574   if(CmiNumPartition()==1)
575 #endif    
576   {
577
578 #if !CMK_CHKP_ALL
579     int len = ckTable.length();
580     for (int i=0; i<len; i++) {
581       CkCheckPTInfo *entry = ckTable[i];
582       // always let the bigger number processor send request
583       //if (CkMyPe() < entry->pNo) continue;
584       // always let the smaller number processor send request, may on same proc
585       if (!isMaster(entry->pNo)) continue;
586       // call inmem_checkpoint to the array element, ask it to send
587       // back checkpoint data via recvData().
588       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
589       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
590     }
591     // if my table is empty, then I am done
592     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
593 #else
594     startArrayCheckpoint();
595 #endif
596     sendProcData();
597   }
598 #if CMK_CONVERSE_MPI
599   else
600   {
601     startCheckpoint();
602   }
603 #endif
604   // pack and send proc level data
605 }
606
607 class MemElementPacker : public CkLocIterator{
608   private:
609     CkLocMgr *locMgr;
610     PUP::er &p;
611   public:
612     MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
613     void addLocation(CkLocation &loc){
614       CkArrayIndexMax idx = loc.getIndex();
615       CkGroupID gID = locMgr->ckGetGroupID();
616       p|gID;
617       p|idx;
618       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
619     }
620 };
621
622 void pupAllElements(PUP::er &p){
623 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
624   int numElements;
625   if(!p.isUnpacking()){
626     numElements = CkCountArrayElements();
627   }
628   p | numElements;
629   if(!p.isUnpacking()){
630     CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
631   }
632 #endif
633 }
634
635 void CkMemCheckPT::startArrayCheckpoint(){
636 #if CMK_CHKP_ALL
637   int size;
638   {
639     PUP::sizer psizer;
640     pupAllElements(psizer);
641     size = psizer.size();
642   }
643   int packSize = size/sizeof(double)+1;
644   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
645   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
646   msg->len = size;
647   msg->cp_flag = 1;
648   int budPEs[2];
649   msg->bud1=CkMyPe();
650   msg->bud2=ChkptOnPe(CkMyPe());
651   {
652     PUP::toMem p(msg->packData);
653     pupAllElements(p);
654   }
655   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
656   if(chkpTable[0]) delete chkpTable[0];
657   chkpTable[0] = msg;
658   //send the checkpoint to my 
659   recvCount++;
660 #endif
661 }
662
663 void CkMemCheckPT::startCheckpoint(){
664 #if CMK_CONVERSE_MPI
665   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
666     CkPrintf("in start checkpointing!!!!\n");
667   int size;
668   {
669     PUP::sizer p;
670     _handleProcData(p,CmiFalse);
671     size = p.size();
672   }
673   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
674   procMsg->len = size;
675   procMsg->cp_flag = 1;
676   {
677     PUP::toMem p(procMsg->packData);
678     _handleProcData(p,CmiFalse);
679   }
680   int pointer = CpvAccess(curPointer);
681   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
682   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
683
684   {
685     PUP::sizer psizer;
686     pupAllElements(psizer);
687     size = psizer.size();
688   }
689   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
690   msg->len = size;
691   msg->cp_flag = 1;
692   {
693     PUP::toMem p(msg->packData);
694     pupAllElements(p);
695   }
696   pointer = CpvAccess(curPointer);
697   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
698   CpvAccess(chkpBuf)[pointer] = msg;
699   if(CkMyPe()==0)
700     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
701   if(CkReplicaAlive()==1){
702     CpvAccess(recvdLocal) = 1;
703     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
704     CkPackMessage(&env);
705     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
706     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
707   }
708   if(CpvAccess(recvdRemote)==1){
709     //compare the checkpoint 
710     int size = CpvAccess(chkpBuf)[pointer]->len;
711     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
712       thisProxy[CkMyPe()].doneComparison(true);
713     }else{
714       CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
715       thisProxy[CkMyPe()].doneComparison(false);
716     }
717     if(CkMyPe()==0)
718       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
719   }
720   else{
721     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
722       { 
723         int pointer = CpvAccess(curPointer);
724         //send the proc data
725         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
726         procMsg->pointer = pointer;
727         envelope * env = (envelope *)(UsrToEnv(procMsg));
728         CkPackMessage(&env);
729         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
730         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
731         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
732           CkPrintf("[%d] sendProcdata\n",CkMyPe());
733         }
734       }
735       //send the array checkpoint data
736       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
737       msg->pointer = CpvAccess(curPointer);
738       envelope * env = (envelope *)(UsrToEnv(msg));
739       CkPackMessage(&env);
740       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
741       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
742       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
743         CkPrintf("[%d] sendArraydata\n",CkMyPe());
744       //can continue work, no need to wait for my replica
745     }
746   }
747 #endif
748 }
749
750 void CkMemCheckPT::doneComparison(bool ret){
751   int _ret = 1;
752   if(!ret){
753     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
754     _ret = 0;
755   }else{
756     _ret = 1;
757   }
758   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
759   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
760 }
761
762 void CkMemCheckPT::doneRComparison(int ret){
763   //    if(CpvAccess(curPointer) == 0){
764   if(ret==CkNumPes()){
765     CpvAccess(localChkpDone) = 1;
766     if(CpvAccess(remoteChkpDone) ==1){
767       thisProxy.doneBothComparison();
768     }
769     if(notifyReplica == 0){
770       //notify the replica am done
771       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
772       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
773       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
774       notifyReplica = 1;
775     }
776   }
777   else{
778     CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
779     thisProxy.RollBack();
780   }
781 }
782
783 void CkMemCheckPT::doneBothComparison(){
784   CpvAccess(recvdRemote) = 0;
785   CpvAccess(recvdLocal) = 0;
786   CpvAccess(localChkpDone) = 0;
787   CpvAccess(remoteChkpDone) = 0;
788   CpvAccess(curPointer)^=1;
789   inCheckpointing = 0;
790   notifyReplica = 0;
791   if(CkMyPe() == 0){
792     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
793   }
794   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
795 }
796
797 void CkMemCheckPT::RollBack(){
798   //restore group data
799   checkpointed = 0;
800   CkMemCheckPT::inRestarting = 1;
801   int pointer = CpvAccess(curPointer)^1;//use the previous one
802   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
803   PUP::fromMem p(chkpMsg->packData);    
804
805   //destroy array elements
806   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
807   int numGroups = CkpvAccess(_groupIDTable)->size();
808   for(int i=0;i<numGroups;i++) {
809     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
810     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
811     obj->flushStates();
812     obj->ckJustMigrated();
813   }
814   //restore array elements
815
816   int numElements;
817   p|numElements;
818
819   if(p.isUnpacking()){
820     for(int i=0;i<numElements;i++){
821       //for(int i=0;i<1;i++){
822       CkGroupID gID;
823       CkArrayIndex idx;
824       p|gID;
825       p|idx;
826       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
827       CmiAssert(mgr);
828       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
829     }
830     }
831     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
832     contribute(cb);
833   }
834
835   void CkMemCheckPT::notifyReplicaDie(int pe){
836     //CkPrintf("[%d] receive replica die\n",CkMyPe());
837     replicaAlive = 0;
838     CpvAccess(_remoteCrashedNode) = pe;
839   }
840
841   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
842   {
843 #if CMK_CHKP_ALL
844     int idx = 1;
845     if(msg->bud1 == CkMyPe()){
846       idx = 0;
847     }
848     int isChkpting = msg->cp_flag;
849     if(isChkpting == 1){
850       if(chkpTable[idx]) delete chkpTable[idx];
851     }
852     chkpTable[idx] = msg;
853     if(isChkpting){
854       recvCount++;
855       if(recvCount == 2){
856         if (where == CkCheckPoint_inMEM) {
857           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
858         }
859         else if (where == CkCheckPoint_inDISK) {
860           // another barrier for finalize the writing using fsync
861           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
862           contribute(0,NULL,CkReduction::sum_int,localcb);
863         }
864         else
865           CmiAbort("Unknown checkpoint scheme");
866         recvCount = 0;
867       }
868     }
869 #endif
870   }
871
872   // don't handle array elements
873   static inline void _handleProcData(PUP::er &p, CmiBool create)
874   {
875     // save readonlys, and callback BTW
876     CkPupROData(p);
877
878     // save mainchares 
879     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
880
881 #ifndef CMK_CHARE_USE_PTR
882     // save non-migratable chare
883     CkPupChareData(p);
884 #endif
885
886     // save groups into Groups.dat
887     CkPupGroupData(p,create);
888
889     // save nodegroups into NodeGroups.dat
890     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
891   }
892
893   void CkMemCheckPT::sendProcData()
894   {
895     // find out size of buffer
896     int size;
897     {
898       PUP::sizer p;
899       _handleProcData(p,CmiTrue);
900       size = p.size();
901     }
902     int packSize = size;
903     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
904     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
905     {
906       PUP::toMem p(msg->packData);
907       _handleProcData(p,CmiTrue);
908     }
909     msg->pe = CkMyPe();
910     msg->len = size;
911     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
912     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
913   }
914
915   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
916   {
917     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
918     CpvAccess(procChkptBuf) = msg;
919     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
920     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
921   }
922
923   // ArrayElement call this function to give us the checkpointed data
924   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
925   {
926     int len = ckTable.length();
927     int idx;
928     for (idx=0; idx<len; idx++) {
929       CkCheckPTInfo *entry = ckTable[idx];
930       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
931     }
932     CkAssert(idx < len);
933     int isChkpting = msg->cp_flag;
934     ckTable[idx]->updateBuffer(msg);
935     if (isChkpting) {
936       // all my array elements have returned their inmem data
937       // inform starter processor that I am done.
938       recvCount ++;
939       if (recvCount == ckTable.length()) {
940         if (where == CkCheckPoint_inMEM) {
941           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
942         }
943         else if (where == CkCheckPoint_inDISK) {
944           // another barrier for finalize the writing using fsync
945           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
946           contribute(0,NULL,CkReduction::sum_int,localcb);
947         }
948         else
949           CmiAbort("Unknown checkpoint scheme");
950         recvCount = 0;
951       } 
952     }
953   }
954
955   // only used in disk checkpointing
956   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
957   {
958     delete m;
959 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
960     system("sync");
961 #endif
962     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
963   }
964
965   // only is called on cpStarter when checkpoint is done
966   void CkMemCheckPT::cpFinish()
967   {
968     CmiAssert(CkMyPe() == cpStarter);
969     peCount++;
970     // now that all processors have finished, activate callback
971     if (peCount == 2) 
972     {
973       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
974       peCount = 0;
975       thisProxy.report();
976     }
977   }
978
979   // for debugging, report checkpoint info
980   void CkMemCheckPT::report()
981   {
982     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
983     inCheckpointing = 0;
984 #if !CMK_CHKP_ALL
985     int objsize = 0;
986     int len = ckTable.length();
987     for (int i=0; i<len; i++) {
988       CkCheckPTInfo *entry = ckTable[i];
989       CmiAssert(entry);
990       objsize += entry->getSize();
991     }
992     CmiAssert(CpvAccess(procChkptBuf));
993     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
994 #else
995     if(CkMyPe()==0)
996       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
997 #endif
998   }
999
1000   /*****************************************************************************
1001     RESTART Procedure
1002    *****************************************************************************/
1003
1004   // master processor of two buddies
1005   inline int CkMemCheckPT::isMaster(int buddype)
1006   {
1007 #if 0
1008     int mype = CkMyPe();
1009     //CkPrintf("ismaster: %d %d\n", pe, mype);
1010     if (CkNumPes() - totalFailed() == 2) {
1011       return mype > buddype;
1012     }
1013     for (int i=1; i<CkNumPes(); i++) {
1014       int me = (buddype+i)%CkNumPes();
1015       if (isFailed(me)) continue;
1016       if (me == mype) return 1;
1017       else return 0;
1018     }
1019     return 0;
1020 #else
1021     // smaller one
1022     int mype = CkMyPe();
1023     //CkPrintf("ismaster: %d %d\n", pe, mype);
1024     if (CkNumPes() - totalFailed() == 2) {
1025       return mype < buddype;
1026     }
1027 #if NODE_CHECKPOINT
1028     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1029     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1030 #else
1031       for (int i=1; i<CkNumPes(); i++) {
1032 #endif
1033         int me = (mype+i)%CkNumPes();
1034         if (isFailed(me)) continue;
1035         if (me == buddype) return 1;
1036         else return 0;
1037       }
1038       return 0;
1039 #endif
1040     }
1041
1042
1043
1044 #if 0
1045     // helper class to pup all elements that belong to same ckLocMgr
1046     class ElementDestoryer : public CkLocIterator {
1047       private:
1048         CkLocMgr *locMgr;
1049       public:
1050         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1051         void addLocation(CkLocation &loc) {
1052           CkArrayIndex idx=loc.getIndex();
1053           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1054           loc.destroy();
1055         }
1056     };
1057 #endif
1058
1059     // restore the bitmap vector for LB
1060     void CkMemCheckPT::resetLB(int diepe)
1061     {
1062 #if CMK_LBDB_ON
1063       int i;
1064       char *bitmap = new char[CkNumPes()];
1065       // set processor available bitmap
1066       get_avail_vector(bitmap);
1067       for (i=0; i<failedPes.length(); i++)
1068         bitmap[failedPes[i]] = 0; 
1069       bitmap[diepe] = 0;
1070
1071 #if CK_NO_PROC_POOL
1072       set_avail_vector(bitmap);
1073 #endif
1074
1075       // if I am the crashed pe, rebuild my failedPEs array
1076       if (CkMyNode() == diepe)
1077         for (i=0; i<CkNumPes(); i++) 
1078           if (bitmap[i]==0) failed(i);
1079
1080       delete [] bitmap;
1081 #endif
1082     }
1083
1084     static double restartT;
1085
1086     // in case when failedPe dies, everybody go through its checkpoint table:
1087     // destory all array elements
1088     // recover lost buddies
1089     // reconstruct all array elements from check point data
1090     // called on all processors
1091     void CkMemCheckPT::restart(int diePe)
1092     {
1093 #if CMK_MEM_CHECKPOINT
1094       double curTime = CmiWallTimer();
1095       if (CkMyPe() == diePe){
1096         restartT = CmiWallTimer();
1097         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1098       }
1099       stage = (char*)"resetLB";
1100       startTime = curTime;
1101       if (CkMyPe() == diePe)
1102         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1103
1104 #if CK_NO_PROC_POOL
1105       failed(diePe);    // add into the list of failed pes
1106 #endif
1107       thisFailedPe = diePe;
1108
1109       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1110
1111       inRestarting = 1;
1112
1113       // disable load balancer's barrier
1114       //if (CkMyPe() != diePe) resetLB(diePe);
1115
1116       CKLOCMGR_LOOP(mgr->startInserting(););
1117
1118
1119       if(CmiNumPartition()==1){
1120         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1121       }else{
1122         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1123         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1124       }
1125 #endif
1126     }
1127
1128     // loally remove all array elements
1129     void CkMemCheckPT::removeArrayElements()
1130     {
1131 #if CMK_MEM_CHECKPOINT
1132       int len = ckTable.length();
1133       double curTime = CmiWallTimer();
1134       if (CkMyPe() == thisFailedPe) 
1135         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1136       stage = (char*)"removeArrayElements";
1137       startTime = curTime;
1138
1139       //  if (cpCallback.isInvalid()) 
1140       //          CkPrintf("invalid pe %d\n",CkMyPe());
1141       //          CkAbort("Didn't set restart callback\n");;
1142       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1143
1144       // get rid of all buffering and remote recs
1145       // including destorying all array elements
1146 #if CK_NO_PROC_POOL  
1147       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1148 #else
1149       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1150 #endif
1151       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1152 #endif
1153     }
1154
1155     // flush state in reduction manager
1156     void CkMemCheckPT::resetReductionMgr()
1157     {
1158       if (CkMyPe() == thisFailedPe) 
1159         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1160       int numGroups = CkpvAccess(_groupIDTable)->size();
1161       for(int i=0;i<numGroups;i++) {
1162         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1163         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1164         obj->flushStates();
1165         obj->ckJustMigrated();
1166       }
1167       // reset again
1168       //CpvAccess(_qd)->flushStates();
1169       if(CmiNumPartition()==1){
1170         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1171       }
1172       else
1173         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1174     }
1175
1176     // recover the lost buddies
1177     void CkMemCheckPT::recoverBuddies()
1178     {
1179       int idx;
1180       int len = ckTable.length();
1181       // ready to flush reduction manager
1182       // cannot be CkMemCheckPT::restart because destory will modify states
1183       double curTime = CmiWallTimer();
1184       if (CkMyPe() == thisFailedPe)
1185         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1186       stage = (char *)"recoverBuddies";
1187       if (CkMyPe() == thisFailedPe)
1188         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1189       startTime = curTime;
1190
1191       // recover buddies
1192       expectCount = 0;
1193 #if !CMK_CHKP_ALL
1194       for (idx=0; idx<len; idx++) {
1195         CkCheckPTInfo *entry = ckTable[idx];
1196         if (entry->pNo == thisFailedPe) {
1197 #if CK_NO_PROC_POOL
1198           // find a new buddy
1199           int budPe = BuddyPE(CkMyPe());
1200 #else
1201           int budPe = thisFailedPe;
1202 #endif
1203           CkArrayCheckPTMessage *msg = entry->getCopy();
1204           msg->bud1 = budPe;
1205           msg->bud2 = CkMyPe();
1206           msg->cp_flag = 0;            // not checkpointing
1207           thisProxy[budPe].recoverEntry(msg);
1208           expectCount ++;
1209         }
1210       }
1211 #else
1212       //send to failed pe
1213       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1214 #if CK_NO_PROC_POOL
1215         // find a new buddy
1216         int budPe = BuddyPE(CkMyPe());
1217 #else
1218         int budPe = thisFailedPe;
1219 #endif
1220         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1221         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1222         msg->cp_flag = 0;            // not checkpointing
1223         msg->bud1 = budPe;
1224         msg->bud2 = CkMyPe();
1225         thisProxy[budPe].recoverEntry(msg);
1226         expectCount ++;
1227       }
1228 #endif
1229
1230       if (expectCount == 0) {
1231         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1232       }
1233       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1234     }
1235
1236     void CkMemCheckPT::gotData()
1237     {
1238       ackCount ++;
1239       if (ackCount == expectCount) {
1240         ackCount = 0;
1241         expectCount = -1;
1242         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1243         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1244       }
1245     }
1246
1247     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1248     {
1249
1250       for (int i=0; i<n; i++) {
1251         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1252         mgr->updateLocation(idx[i], nowOnPe);
1253       }
1254       thisProxy[nowOnPe].gotReply();
1255     }
1256
1257     // restore array elements
1258     void CkMemCheckPT::recoverArrayElements()
1259     {
1260       if(CmiMyPartition()==1&&CkMyPe()==0){
1261         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1262         fflush(stdout);
1263       }
1264       double curTime = CmiWallTimer();
1265       int len = ckTable.length();
1266       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1267       stage = (char *)"recoverArrayElements";
1268       if (CkMyPe() == thisFailedPe)
1269         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1270       startTime = curTime;
1271       int flag = 0;
1272       // recover all array elements
1273       int count = 0;
1274
1275 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1276       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1277       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1278 #endif
1279
1280 #if !CMK_CHKP_ALL
1281       for (int idx=0; idx<len; idx++)
1282       {
1283         CkCheckPTInfo *entry = ckTable[idx];
1284 #if CK_NO_PROC_POOL
1285         // the bigger one will do 
1286         //    if (CkMyPe() < entry->pNo) continue;
1287         if (!isMaster(entry->pNo)) continue;
1288 #else
1289         // smaller one do it, which has the original object
1290         if (CkMyPe() == entry->pNo+1 || 
1291             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1292 #endif
1293         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1294
1295         entry->updateBuddy(CkMyPe(), entry->pNo);
1296         CkArrayCheckPTMessage *msg = entry->getCopy();
1297         // gzheng
1298         //thisProxy[CkMyPe()].inmem_restore(msg);
1299         inmem_restore(msg);
1300 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1301         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1302         int homePe = mgr->homePe(msg->index);
1303         if (homePe != CkMyPe()) {
1304           gmap[homePe].push_back(msg->locMgr);
1305           imap[homePe].push_back(msg->index);
1306         }
1307 #endif
1308         CkFreeMsg(msg);
1309         count ++;
1310       }
1311 #else
1312       char * packData;
1313       if(CmiNumPartition()==1){
1314         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1315         packData = (char *)msg->packData;
1316       }
1317       else{
1318         int pointer = CpvAccess(curPointer);
1319         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1320         packData = msg->packData;
1321       }
1322 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1323       recoverAll(packData,gmap,imap);
1324 #else
1325       recoverAll(packData);
1326 #endif
1327 #endif
1328       curTime = CmiWallTimer();
1329 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1330       for (int i=0; i<CkNumPes(); i++) {
1331         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1332           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1333           flag++;       
1334         }
1335       }
1336       delete [] imap;
1337       delete [] gmap;
1338 #endif
1339       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1340
1341       CKLOCMGR_LOOP(mgr->doneInserting(););
1342
1343       // _crashedNode = -1;
1344       CpvAccess(_crashedNode) = -1;
1345 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1346       if (CkMyPe() == 0)
1347         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1348 #else
1349       if(flag == 0)
1350       {
1351         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1352       }
1353 #endif
1354       if(CmiMyPartition()==1&&CkMyPe()==0){
1355         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1356         fflush(stdout);
1357       }
1358     }
1359
1360     void CkMemCheckPT::gotReply(){
1361       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1362     }
1363
1364     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1365       if(CmiMyPartition()==1&&CkMyPe()==0){
1366         CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1367         fflush(stdout);
1368       }
1369 #if CMK_CHKP_ALL
1370       PUP::fromMem p(packData);
1371       int numElements;
1372       p|numElements;
1373       if(p.isUnpacking()){
1374         for(int i=0;i<numElements;i++){
1375           CkGroupID gID;
1376           CkArrayIndex idx;
1377           p|gID;
1378           p|idx;
1379           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1380           int homePe = mgr->homePe(idx);
1381 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1382           mgr->resume(idx,p,CmiTrue,CmiTrue);
1383 #else
1384           if(CmiNumPartition()==1)
1385             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1386           else{
1387             if(CkMyPe()==thisFailedPe){
1388               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1389             }
1390             else{
1391               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1392             }
1393           }
1394 #endif
1395 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1396           homePe = mgr->homePe(idx);
1397           if (homePe != CkMyPe()) {
1398             gmap[homePe].push_back(gID);
1399             imap[homePe].push_back(idx);
1400           }
1401 #endif
1402         }
1403       }
1404 #endif
1405       if(CmiMyPartition()==1&&CkMyPe()==0){
1406         CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1407         fflush(stdout);
1408       }
1409     }
1410
1411
1412     // on every processor
1413     // turn load balancer back on
1414     void CkMemCheckPT::finishUp()
1415     {
1416       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1417       //CKLOCMGR_LOOP(mgr->doneInserting(););
1418
1419       if (CkMyPe() == thisFailedPe)
1420       {
1421         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1422         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1423         fflush(stdout);
1424       }
1425       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1426       inRestarting = 0;
1427
1428 #if CMK_CONVERSE_MPI    
1429       if(CmiNumPartition()!=1){
1430         CpvAccess(recvdProcChkp) = 0;
1431         CpvAccess(recvdArrayChkp) = 0;
1432         CpvAccess(curPointer)^=1;
1433         //notify my replica, restart is done
1434         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1435         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1436         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1437       }
1438       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1439         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1440       }
1441 #endif
1442
1443 #if CK_NO_PROC_POOL
1444 #if NODE_CHECKPOINT
1445       int numnodes = CmiNumPhysicalNodes();
1446 #else
1447       int numnodes = CkNumPes();
1448 #endif
1449       if (numnodes-totalFailed() <=2) {
1450         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1451         _memChkptOn = 0;
1452       }
1453 #endif
1454     }
1455
1456     void CkMemCheckPT::recoverFromSoftFailure()
1457     {
1458       inRestarting = 0;
1459       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1460     }
1461     // called only on 0
1462     void CkMemCheckPT::quiescence(CkCallback &cb)
1463     {
1464       static int pe_count = 0;
1465       pe_count ++;
1466       CmiAssert(CkMyPe() == 0);
1467       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1468       if (pe_count == CkNumPes()) {
1469         pe_count = 0;
1470         cb.send();
1471       }
1472     }
1473
1474     // User callable function - to start a checkpoint
1475     // callback cb is used to pass control back
1476     void CkStartMemCheckpoint(CkCallback &cb)
1477     {
1478 #if CMK_MEM_CHECKPOINT
1479       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1480       if (_memChkptOn == 0) {
1481         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1482         cb.send();
1483         return;
1484       }
1485       if (CkInRestarting()) {
1486         // trying to checkpointing during restart
1487         cb.send();
1488         return;
1489       }
1490       // store user callback and user data
1491       CkMemCheckPT::cpCallback = cb;
1492
1493       // broadcast to start check pointing
1494       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1495       checkptMgr.doItNow(CkMyPe(), cb);
1496 #else
1497       // when mem checkpoint is disabled, invike cb immediately
1498       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1499       cb.send();
1500 #endif
1501     }
1502
1503     void CkRestartCheckPoint(int diePe)
1504     {
1505       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1506       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1507       // broadcast
1508       checkptMgr.restart(diePe);
1509     }
1510
1511     static int _diePE = -1;
1512
1513     // callback function used locally by ccs handler
1514     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1515     {
1516       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1517       CkRestartCheckPoint(_diePE);
1518     }
1519
1520
1521     // called on crashed PE
1522     static void restartBeginHandler(char *msg)
1523     {
1524       CmiFree(msg);
1525 #if CMK_MEM_CHECKPOINT
1526 #if CMK_USE_BARRIER
1527       if(CkMyPe()!=_diePE){
1528         printf("restar begin on %d\n",CkMyPe());
1529         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1530         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1531         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1532       }else{
1533         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1534         CkRestartCheckPointCallback(NULL, NULL);
1535       }
1536 #else
1537       static int count = 0;
1538       CmiAssert(CkMyPe() == _diePE);
1539       count ++;
1540       if (count == CkNumPes()) {
1541         printf("restart begin on %d\n",CkMyPe());
1542         CkRestartCheckPointCallback(NULL, NULL);
1543         count = 0;
1544       }
1545 #endif
1546 #endif
1547     }
1548
1549     extern void _discard_charm_message();
1550     extern void _resume_charm_message();
1551
1552     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1553       return data;
1554     }
1555
1556     static void restartBcastHandler(char *msg)
1557     {
1558 #if CMK_MEM_CHECKPOINT
1559       // advance phase counter
1560       CkMemCheckPT::inRestarting = 1;
1561       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1562       // gzheng
1563       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1564
1565       if (CkMyPe()==_diePE)
1566         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1567
1568       // reset QD counters
1569       /*  gzheng
1570           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1571        */
1572
1573       /*  gzheng
1574           if (CkMyPe()==_diePE)
1575           CkRestartCheckPointCallback(NULL, NULL);
1576        */
1577       CmiFree(msg);
1578
1579       _resume_charm_message();
1580
1581       // reduction
1582       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1583       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1584 #if CMK_USE_BARRIER
1585       //CmiPrintf("before reduce\n");   
1586       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1587       //CmiPrintf("after reduce\n");    
1588 #else
1589       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1590 #endif 
1591       checkpointed = 0;
1592 #endif
1593     }
1594
1595     extern void _initDone();
1596
1597     bool compare(char * buf1, char *buf2){
1598         PUP::checker pchecker(buf1,buf2);
1599                 pchecker.skip();
1600
1601                 int numElements;
1602                 pchecker|numElements;
1603                 for(int i=0;i<numElements;i++){
1604       //for(int i=0;i<1;i++){
1605       CkGroupID gID;
1606       CkArrayIndex idx;
1607
1608       pchecker|gID;
1609       pchecker|idx;
1610
1611       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1612       mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1613       }
1614       //return pchecker.getResult();
1615       return true;
1616     }
1617
1618     static void recvRemoteChkpHandler(char *msg){
1619       envelope *env = (envelope *)msg;
1620       CkUnpackMessage(&env);
1621       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1622       if(CpvAccess(recvdLocal)==1){
1623         int pointer = CpvAccess(curPointer);
1624         int size = CpvAccess(chkpBuf)[pointer]->len;
1625         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1626           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1627         }else
1628         {
1629           CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1630           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1631         }
1632         delete chkpMsg;
1633         if(CkMyPe()==0)
1634           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1635       }else{
1636         CpvAccess(recvdRemote) = 1;
1637         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1638         CpvAccess(buddyBuf) = chkpMsg;
1639       }  
1640     }
1641
1642     static void replicaRecoverHandler(char *msg){
1643       CpvAccess(_remoteCrashedNode) = -1;
1644       CkMemCheckPT::replicaAlive = 1;
1645       //fflush(stdout);
1646       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1647       bool ret = true;
1648       CpvAccess(remoteChkpDone) = 1;
1649       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1650       CmiFree(msg);
1651   
1652     }
1653     static void replicaChkpDoneHandler(char *msg){
1654       CpvAccess(remoteChkpDone) = 1;
1655       if(CpvAccess(localChkpDone) == 1)
1656         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1657       CmiFree(msg);
1658     }
1659
1660     static void replicaDieHandler(char * msg){
1661 #if CMK_CONVERSE_MPI    
1662       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1663       CpvAccess(_remoteCrashedNode) = diePe;
1664       CkMemCheckPT::replicaAlive = 0;
1665       if(CkMyPe()==diePe){
1666         CmiPrintf("pe %d in replicad word die\n",diePe);
1667         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1668         fflush(stdout);
1669       }
1670       find_spare_mpirank(diePe,CmiMyPartition()^1);
1671 #endif
1672       //broadcast to my partition to get local max iter
1673       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1674       CmiFree(msg);
1675     }
1676
1677
1678     static void replicaDieBcastHandler(char *msg){
1679       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1680       CpvAccess(_remoteCrashedNode) = diePe;
1681       CkMemCheckPT::replicaAlive = 0;
1682       CmiFree(msg);
1683     }
1684
1685     static void recoverRemoteProcDataHandler(char *msg){
1686       if(CmiMyPartition()==1&&CkMyPe()==0){
1687         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1688         fflush(stdout);
1689       }
1690       envelope *env = (envelope *)msg;
1691       CkUnpackMessage(&env);
1692       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1693
1694       //store the checkpoint
1695       int pointer = procMsg->pointer;
1696
1697
1698       if(CkMyPe()==CpvAccess(_crashedNode)){
1699         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1700         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1701         PUP::fromMem p(procMsg->packData);
1702         _handleProcData(p,CmiTrue);
1703         _initDone();
1704       }
1705       else{
1706         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1707         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1708         //_handleProcData(p,CmiFalse);
1709       }
1710       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1711       CKLOCMGR_LOOP(mgr->startInserting(););
1712
1713       CpvAccess(recvdProcChkp) =1;
1714       if(CpvAccess(recvdArrayChkp)==1){
1715         _resume_charm_message();
1716         _diePE = CpvAccess(_crashedNode);
1717         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1718         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1719         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1720       }
1721       if(CmiMyPartition()==1&&CkMyPe()==0){
1722         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1723         fflush(stdout);
1724       }
1725     }
1726
1727     static void recoverRemoteArrayDataHandler(char *msg){
1728       envelope *env = (envelope *)msg;
1729       CkUnpackMessage(&env);
1730       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1731       if(CmiMyPartition()==1&&CkMyPe()==0){
1732         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1733         fflush(stdout);
1734       }
1735
1736       //store the checkpoint
1737       int pointer = chkpMsg->pointer;
1738       CpvAccess(curPointer) = pointer;
1739       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1740       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1741       CpvAccess(recvdArrayChkp) =1;
1742       CkMemCheckPT::inRestarting = 1;
1743       if(CpvAccess(recvdProcChkp) == 1){
1744         _resume_charm_message();
1745         _diePE = CpvAccess(_crashedNode);
1746         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1747         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1748         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1749         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1750       }
1751       if(CmiMyPartition()==1&&CkMyPe()==0){
1752         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1753         fflush(stdout);
1754       }
1755     }
1756
1757     static void recvPhaseHandler(char * msg)
1758     {
1759       CpvAccess(_curRestartPhase)--;
1760       CkMemCheckPT::inRestarting = 1;
1761       CmiFree(msg);
1762     }
1763     // called on crashed processor
1764     static void recoverProcDataHandler(char *msg)
1765     {
1766 #if CMK_MEM_CHECKPOINT
1767       int i;
1768       envelope *env = (envelope *)msg;
1769       CkUnpackMessage(&env);
1770       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1771       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1772       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1773       PUP::fromMem p(procMsg->packData);
1774       _handleProcData(p,CmiTrue);
1775
1776       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1777       // gzheng
1778       CKLOCMGR_LOOP(mgr->startInserting(););
1779
1780       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1781       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1782       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1783       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1784
1785       _initDone();
1786       //   CpvAccess(_qd)->flushStates();
1787       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1788 #endif
1789     }
1790     //for replica, got the phase number from my neighbor processor in the current partition
1791     static void askPhaseHandler(char *msg)
1792     {
1793 #if CMK_MEM_CHECKPOINT
1794       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1795       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1796       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1797       CmiSetHandler(msg, recvPhaseHandlerIdx);
1798       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1799 #endif
1800     }
1801     // called on its backup processor
1802     // get backup message buffer and sent to crashed processor
1803     static void askProcDataHandler(char *msg)
1804     {
1805 #if CMK_MEM_CHECKPOINT
1806       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1807       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1808       if (CpvAccess(procChkptBuf) == NULL)  {
1809         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1810         CkAbort("no checkpoint found");
1811       }
1812       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1813       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1814
1815       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1816
1817       CkPackMessage(&env);
1818       CmiSetHandler(env, recoverProcDataHandlerIdx);
1819       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1820       CpvAccess(procChkptBuf) = NULL;
1821       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1822 #endif
1823     }
1824
1825     // called on PE 0
1826     void qd_callback(void *m)
1827     {
1828       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1829         fflush(stdout);
1830       CkFreeMsg(m);
1831       if(CmiNumPartition()==1){
1832 #ifdef CMK_SMP
1833         for(int i=0;i<CmiMyNodeSize();i++){
1834           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1835           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1836           CmiSetHandler(msg, askProcDataHandlerIdx);
1837           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1838           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1839         }
1840         return;
1841 #endif
1842         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1843         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1844         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1845         CmiSetHandler(msg, askProcDataHandlerIdx);
1846         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1847         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1848       }
1849       else{
1850         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1851         CmiSetHandler(msg, recvPhaseHandlerIdx);
1852         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1853       }
1854     }
1855
1856     // on crashed node
1857     void CkMemRestart(const char *dummy, CkArgMsg *args)
1858     {
1859 #if CMK_MEM_CHECKPOINT
1860       _diePE = CmiMyNode();
1861       CpvAccess( _crashedNode )= CmiMyNode();
1862       CkMemCheckPT::inRestarting = 1;
1863       _discard_charm_message();
1864
1865       /*if(CmiMyRank()==0){
1866         CkCallback cb(qd_callback);
1867         CkStartQD(cb);
1868         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1869         }*/
1870       if(CmiNumPartition()==1){
1871         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1872         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1873         restartT = CmiWallTimer();
1874         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1875         fflush(stdout);
1876         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1877         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1878         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1879         CmiSetHandler(msg, askProcDataHandlerIdx);
1880         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1881         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1882       }
1883       else{
1884         CkCallback cb(qd_callback);
1885         CkStartQD(cb);
1886       }
1887 #else
1888       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1889 #endif
1890     }
1891
1892     // can be called in other files
1893     // return true if it is in restarting
1894     extern "C"
1895       int CkInRestarting()
1896       {
1897 #if CMK_MEM_CHECKPOINT
1898         if (CpvAccess( _crashedNode)!=-1) return 1;
1899         // gzheng
1900         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1901         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1902         return CkMemCheckPT::inRestarting;
1903 #else
1904         return 0;
1905 #endif
1906       }
1907
1908     extern "C"
1909       int CkReplicaAlive()
1910       {
1911         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1912         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1913         return CkMemCheckPT::replicaAlive;
1914
1915         /*if(CkMemCheckPT::replicaDead==1)
1916           return 0;
1917           else          
1918           return 1;*/
1919       }
1920
1921     extern "C"
1922       int CkInCheckpointing()
1923       {
1924         return CkMemCheckPT::inCheckpointing;
1925       }
1926
1927     extern "C"
1928       void CkSetInLdb(){
1929 #if CMK_MEM_CHECKPOINT
1930         CkMemCheckPT::inLoadbalancing = 1;
1931 #endif
1932       }
1933
1934     extern "C"
1935       int CkInLdb(){
1936 #if CMK_MEM_CHECKPOINT
1937         return CkMemCheckPT::inLoadbalancing;
1938 #endif
1939         return 0;
1940       }
1941
1942     extern "C"
1943       void CkResetInLdb(){
1944 #if CMK_MEM_CHECKPOINT
1945         CkMemCheckPT::inLoadbalancing = 0;
1946 #endif
1947       }
1948
1949     /*****************************************************************************
1950       module initialization
1951      *****************************************************************************/
1952
1953     static int arg_where = CkCheckPoint_inMEM;
1954
1955 #if CMK_MEM_CHECKPOINT
1956     void init_memcheckpt(char **argv)
1957     {
1958       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1959         arg_where = CkCheckPoint_inDISK;
1960       }
1961       // initiliazing _crashedNode variable
1962       CpvInitialize(int, _crashedNode);
1963       CpvInitialize(int, _remoteCrashedNode);
1964       CpvAccess(_crashedNode) = -1;
1965       CpvAccess(_remoteCrashedNode) = -1;
1966       init_FI(argv);
1967
1968     }
1969 #endif
1970
1971     class CkMemCheckPTInit: public Chare {
1972       public:
1973         CkMemCheckPTInit(CkArgMsg *m) {
1974 #if CMK_MEM_CHECKPOINT
1975           if (arg_where == CkCheckPoint_inDISK) {
1976             CkPrintf("Charm++> Double-disk Checkpointing. \n");
1977           }
1978           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1979           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1980 #endif
1981         }
1982     };
1983
1984     static void notifyHandler(char *msg)
1985     {
1986 #if CMK_MEM_CHECKPOINT
1987       CmiFree(msg);
1988       /* immediately increase restart phase to filter old messages */
1989       CpvAccess(_curRestartPhase) ++;
1990       CpvAccess(_qd)->flushStates();
1991       _discard_charm_message();
1992
1993 #endif
1994     }
1995
1996     extern "C"
1997       void notify_crash(int node)
1998       {
1999 #ifdef CMK_MEM_CHECKPOINT
2000         CpvAccess( _crashedNode) = node;
2001 #ifdef CMK_SMP
2002         for(int i=0;i<CkMyNodeSize();i++){
2003           CpvAccessOther(_crashedNode,i)=node;
2004         }
2005 #endif
2006         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2007         CkMemCheckPT::inRestarting = 1;
2008
2009         // this may be in interrupt handler, send a message to reset QD
2010         int pe = CmiNodeFirst(CkMyNode());
2011         for(int i=0;i<CkMyNodeSize();i++){
2012           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2013           CmiSetHandler(msg, notifyHandlerIdx);
2014           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2015         }
2016 #endif
2017       }
2018
2019     extern "C" void (*notify_crash_fn)(int node);
2020
2021
2022 #if CMK_CONVERSE_MPI
2023     void buddyDieHandler(char *msg)
2024     {
2025 #if CMK_MEM_CHECKPOINT
2026       // notify
2027       CkMemCheckPT::inRestarting = 1;
2028       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2029       notify_crash(diepe);
2030       // send message to crash pe to let it restart
2031       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2032       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2033       int buddy = obj->BuddyPE(CmiMyPe());
2034       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2035       if (buddy == diepe)  {
2036         mpi_restart_crashed(diepe, newrank);
2037       }
2038 #endif
2039     }
2040
2041     void pingHandler(void *msg)
2042     {
2043       lastPingTime = CmiWallTimer();
2044       CmiFree(msg);
2045     }
2046
2047     void pingCheckHandler()
2048     {
2049 #if CMK_MEM_CHECKPOINT
2050       double now = CmiWallTimer();
2051       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2052         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2053         int i, pe, buddy;
2054         // tell everyone the buddy dies
2055         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2056         for (i = 1; i < CmiNumPes(); i++) {
2057           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2058           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2059         }
2060         buddy = pe;
2061         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2062         fflush(stdout);
2063         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2064         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2065         CmiSetHandler(msg, buddyDieHandlerIdx);
2066         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2067         //send to everyone in the other world
2068         if(CmiNumPartition()!=1){
2069           for(int i=0;i<CmiNumPes();i++){
2070             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2071             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2072             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2073             //fflush(stdout);
2074             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2075             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2076           }
2077         }
2078       }
2079         else 
2080           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2081 #endif
2082       }
2083
2084       void pingBuddy()
2085       {
2086 #if CMK_MEM_CHECKPOINT
2087         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2088         if (obj) {
2089           int buddy = obj->BuddyPE(CkMyPe());
2090           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2091           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2092           CmiSetHandler(msg, pingHandlerIdx);
2093           CmiGetRestartPhase(msg) = 9999;
2094           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2095         }
2096         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2097 #endif
2098       }
2099 #endif
2100
2101       // initproc
2102       void CkRegisterRestartHandler( )
2103       {
2104 #if CMK_MEM_CHECKPOINT
2105         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2106         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2107         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2108         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2109         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2110
2111         //for replica
2112         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2113         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2114         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2115         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2116         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2117         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2118         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2119         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2120         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2121
2122 #if CMK_CONVERSE_MPI
2123         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2124         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2125         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2126 #endif
2127
2128         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2129         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2130         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2131         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2132         CpvInitialize(int,curPointer);
2133         CpvInitialize(int,recvdLocal);
2134         CpvInitialize(int,localChkpDone);
2135         CpvInitialize(int,remoteChkpDone);
2136         CpvInitialize(int,recvdRemote);
2137         CpvInitialize(int,recvdProcChkp);
2138         CpvInitialize(int,recvdArrayChkp);
2139
2140         CpvAccess(procChkptBuf) = NULL;
2141         CpvAccess(buddyBuf) = NULL;
2142         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2143         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2144         CpvAccess(chkpBuf)[0] = NULL;
2145         CpvAccess(chkpBuf)[1] = NULL;
2146         CpvAccess(localProcChkpBuf)[0] = NULL;
2147         CpvAccess(localProcChkpBuf)[1] = NULL;
2148
2149         CpvAccess(curPointer) = 0;
2150         CpvAccess(recvdLocal) = 0;
2151         CpvAccess(localChkpDone) = 0;
2152         CpvAccess(remoteChkpDone) = 0;
2153         CpvAccess(recvdRemote) = 0;
2154         CpvAccess(recvdProcChkp) = 0;
2155         CpvAccess(recvdArrayChkp) = 0;
2156
2157         notify_crash_fn = notify_crash;
2158
2159 #if ! CMK_CONVERSE_MPI
2160         // print pid to kill
2161         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2162         //  sleep(4);
2163 #endif
2164 #endif
2165       }
2166
2167
2168       extern "C"
2169         int CkHasCheckpoints()
2170         {
2171           return checkpointed;
2172         }
2173
2174       /// @todo: the following definitions should be moved to a separate file containing
2175       // structures and functions about fault tolerance strategies
2176
2177       /**
2178        *  * @brief: function for killing a process                                             
2179        *   */
2180 #ifdef CMK_MEM_CHECKPOINT
2181 #if CMK_HAS_GETPID
2182       void killLocal(void *_dummy,double curWallTime){
2183         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2184         if(CmiWallTimer()<killTime-1){
2185           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2186         }else{ 
2187 #if CMK_CONVERSE_MPI
2188           CkDieNow();
2189 #else 
2190           kill(getpid(),SIGKILL);                                               
2191 #endif
2192         }              
2193       } 
2194 #else
2195       void killLocal(void *_dummy,double curWallTime){
2196         CmiAbort("kill() not supported!");
2197       }
2198 #endif
2199 #endif
2200
2201 #ifdef CMK_MEM_CHECKPOINT
2202       /**
2203        * @brief: reads the file with the kill information
2204        */
2205       void readKillFile(){
2206         FILE *fp=fopen(killFile,"r");
2207         if(!fp){
2208           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2209           return;
2210         }
2211         int proc;
2212         double sec;
2213         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2214           if(proc == CkMyNode() && CkMyRank() == 0){
2215             killTime = CmiWallTimer()+sec;
2216             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2217             CcdCallFnAfter(killLocal,NULL,sec*1000);
2218           }
2219         }
2220         fclose(fp);
2221       }
2222
2223 #if ! CMK_CONVERSE_MPI
2224       void CkDieNow()
2225       {
2226 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2227         // ignored for non-mpi version
2228         CmiPrintf("[%d] die now.\n", CmiMyPe());
2229         killTime = CmiWallTimer()+0.001;
2230         CcdCallFnAfter(killLocal,NULL,1);
2231 #endif
2232       }
2233 #endif
2234
2235 #endif
2236
2237 #include "CkMemCheckpoint.def.h"
2238
2239