fix bug from last commit
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117 }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, localChkpDone);
131 CpvDeclare(int, remoteChkpDone);
132 CpvDeclare(int, recvdArrayChkp);
133 CpvDeclare(int, recvdProcChkp);
134
135 bool compare(char * buf1, char * buf2);
136 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
137 // Converse function handles
138 static int askPhaseHandlerIdx;
139 static int recvPhaseHandlerIdx;
140 static int askProcDataHandlerIdx;
141 static int restartBcastHandlerIdx;
142 static int recoverProcDataHandlerIdx;
143 static int restartBeginHandlerIdx;
144 static int recvRemoteChkpHandlerIdx;
145 static int replicaDieHandlerIdx;
146 static int replicaDieBcastHandlerIdx;
147 static int replicaRecoverHandlerIdx;
148 static int replicaChkpDoneHandlerIdx;
149 static int recoverRemoteProcDataHandlerIdx;
150 static int recoverRemoteArrayDataHandlerIdx;
151 static int notifyHandlerIdx;
152 // compute the backup processor
153 // FIXME: avoid crashed processors
154 #if CMK_CONVERSE_MPI
155 static int pingHandlerIdx;
156 static int pingCheckHandlerIdx;
157 static int buddyDieHandlerIdx;
158 static double lastPingTime = -1;
159
160 extern "C" void mpi_restart_crashed(int pe, int rank);
161 extern "C" int  find_spare_mpirank(int pe,int partition);
162
163 void pingBuddy();
164 void pingCheckHandler();
165
166 #endif
167 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
168
169 inline int CkMemCheckPT::BuddyPE(int pe)
170 {
171   int budpe;
172 #if NODE_CHECKPOINT
173   // buddy is the processor with same rank on the next physical node
174   int r1 = CmiPhysicalRank(pe);
175   int budnode = CmiPhysicalNodeID(pe);
176   do {
177     budnode = (budnode+1)%CmiNumPhysicalNodes();
178     int *pelist;
179     int num;
180     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
181     budpe = pelist[r1 % num];
182   } while (isFailed(budpe));
183   if (budpe == pe) {
184     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
185     CmiAbort("Failed to find a buddy processor");
186   }
187 #else
188   budpe = pe;
189   budpe = (budpe+1)%CkNumPes();
190 #endif
191   return budpe;
192 }
193
194 // called in array element constructor
195 // choose and register with 2 buddies for checkpoiting 
196 #if CMK_MEM_CHECKPOINT
197 void ArrayElement::init_checkpt() {
198   if (_memChkptOn == 0) return;
199   if (CkInRestarting()) {
200     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
201   }
202   // only master init checkpoint
203   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
204
205   budPEs[0] = CkMyPe();
206   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
207   CmiAssert(budPEs[0] != budPEs[1]);
208   // inform checkPTMgr
209   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
210   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
211   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
212   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
213 }
214 #endif
215
216 // entry function invoked by checkpoint mgr asking for checkpoint data
217 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
218 #if CMK_MEM_CHECKPOINT
219   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
220   //char index[128];   thisIndexMax.sprint(index);
221   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
222   CkLocMgr *locMgr = thisArray->getLocMgr();
223   CmiAssert(myRec!=NULL);
224   int size;
225   {
226     PUP::sizer p;
227     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
228     size = p.size();
229   }
230   int packSize = size/sizeof(double) +1;
231   CkArrayCheckPTMessage *msg =
232     new (packSize, 0) CkArrayCheckPTMessage;
233   msg->len = size;
234   msg->index =thisIndexMax;
235   msg->aid = thisArrayID;
236   msg->locMgr = locMgr->getGroupID();
237   msg->cp_flag = 1;
238   {
239     PUP::toMem p(msg->packData);
240     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
241   }
242
243   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
244   checkptMgr.recvData(msg, 2, budPEs);
245   delete m;
246 #endif
247 }
248
249 // checkpoint holder class - for memory checkpointing
250 class CkMemCheckPTInfo: public CkCheckPTInfo
251 {
252   CkArrayCheckPTMessage *ckBuffer;
253   public:
254   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
255     CkCheckPTInfo(a, loc, idx, pno)
256   {
257     ckBuffer = NULL;
258   }
259   ~CkMemCheckPTInfo() 
260   {
261     if (ckBuffer) delete ckBuffer; 
262   }
263   inline void updateBuffer(CkArrayCheckPTMessage *data) 
264   {
265     CmiAssert(data!=NULL);
266     if (ckBuffer) delete ckBuffer;
267     ckBuffer = data;
268   }    
269   inline CkArrayCheckPTMessage * getCopy()
270   {
271     if (ckBuffer == NULL) {
272       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
273       CmiAbort("Abort!");
274     }
275     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
276   }     
277   inline void updateBuddy(int b1, int b2) {
278     CmiAssert(ckBuffer);
279     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
280     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
281     CmiAssert(pNo != CkMyPe());
282   }
283   inline int getSize() { 
284     CmiAssert(ckBuffer);
285     return ckBuffer->len; 
286   }
287 };
288
289 // checkpoint holder class - for in-disk checkpointing
290 class CkDiskCheckPTInfo: public CkCheckPTInfo 
291 {
292   char *fname;
293   int bud1, bud2;
294   int len;                      // checkpoint size
295   public:
296   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
297   {
298 #if CMK_USE_MKSTEMP
299     fname = new char[64];
300     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
301     mkstemp(fname);
302 #else
303     fname=tmpnam(NULL);
304 #endif
305     bud1 = bud2 = -1;
306     len = 0;
307   }
308   ~CkDiskCheckPTInfo() 
309   {
310     remove(fname);
311   }
312   inline void updateBuffer(CkArrayCheckPTMessage *data) 
313   {
314     double t = CmiWallTimer();
315     // unpack it
316     envelope *env = UsrToEnv(data);
317     CkUnpackMessage(&env);
318     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
319     FILE *f = fopen(fname,"wb");
320     PUP::toDisk p(f);
321     CkPupMessage(p, (void **)&data);
322     // delay sync to the end because otherwise the messages are blocked
323     //    fsync(fileno(f));
324     fclose(f);
325     bud1 = data->bud1;
326     bud2 = data->bud2;
327     len = data->len;
328     delete data;
329     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
330   }
331   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
332   {
333     CkArrayCheckPTMessage *data;
334     FILE *f = fopen(fname,"rb");
335     PUP::fromDisk p(f);
336     CkPupMessage(p, (void **)&data);
337     fclose(f);
338     data->bud1 = bud1;                          // update the buddies
339     data->bud2 = bud2;
340     return data;
341   }
342   inline void updateBuddy(int b1, int b2) {
343     bud1 = b1; bud2 = b2;
344     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
345     CmiAssert(pNo != CkMyPe());
346   }
347   inline int getSize() { 
348     return len; 
349   }
350 };
351
352 CkMemCheckPT::CkMemCheckPT(int w)
353 {
354   int numnodes = 0;
355 #if NODE_CHECKPOINT
356   numnodes = CmiNumPhysicalNodes();
357 #else
358   numnodes = CkNumPes();
359 #endif
360 #if CK_NO_PROC_POOL
361   if (numnodes <= 2)
362 #else
363     if (numnodes  == 1)
364 #endif
365     {
366       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
367       _memChkptOn = 0;
368     }
369   inRestarting = 0;
370   inCheckpointing = 0;
371   recvCount = peCount = 0;
372   ackCount = 0;
373   expectCount = -1;
374   where = w;
375   replicaAlive = 1;
376 #if CMK_CONVERSE_MPI
377   void pingBuddy();
378   void pingCheckHandler();
379   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
380   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
381 #endif
382   chkpTable[0] = NULL;
383   chkpTable[1] = NULL;
384   maxIter = -1;
385   recvIterCount = 0;
386   localDecided = false;
387 }
388
389 CkMemCheckPT::~CkMemCheckPT()
390 {
391   int len = ckTable.length();
392   for (int i=0; i<len; i++) {
393     delete ckTable[i];
394   }
395 }
396
397 void CkMemCheckPT::pup(PUP::er& p) 
398
399   CBase_CkMemCheckPT::pup(p); 
400   p|cpStarter;
401   p|thisFailedPe;
402   p|failedPes;
403   p|ckCheckPTGroupID;           // recover global variable
404   p|cpCallback;                 // store callback
405   p|where;                      // where to checkpoint
406   p|peCount;
407   if (p.isUnpacking()) {
408     recvCount = 0;
409 #if CMK_CONVERSE_MPI
410     void pingBuddy();
411     void pingCheckHandler();
412     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
413     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
414 #endif
415     maxIter = -1;
416     recvIterCount = 0;
417     localDecided = false;
418   }
419 }
420
421 void CkMemCheckPT::getIter(){
422   localDecided = true;
423   localMaxIter = maxIter+1;
424   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
425   int elemCount = CkCountChkpSyncElements();
426   if(elemCount == 0){
427     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
428   }
429 }
430
431 //when one replica failes, decide the next chkp iter
432
433 void CkMemCheckPT::recvIter(int iter){
434   if(maxIter<iter){
435     maxIter=iter;
436   }
437 }
438
439 void CkMemCheckPT::recvMaxIter(int iter){
440   localDecided = false;
441   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
442 }
443
444 void CkMemCheckPT::reachChkpIter(){
445   recvIterCount++;
446   elemCount = CkCountChkpSyncElements();
447   if(recvIterCount == elemCount){
448     recvIterCount = 0;
449     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
450   }
451 }
452
453 void CkMemCheckPT::startChkp(){
454   CkPrintf("start checkpoint\n");
455   CkStartMemCheckpoint(cpCallback);
456 }
457
458 // called by checkpoint mgr to restore an array element
459 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
460 {
461 #if CMK_MEM_CHECKPOINT
462   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
463   // m->index.print();
464   PUP::fromMem p(m->packData);
465   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
466   CmiAssert(mgr);
467 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
468   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
469 #else
470   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
471 #endif
472
473   // find a list of array elements bound together
474   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
475   CmiAssert(elt);
476   CkLocRec_local *rec = elt->myRec;
477   CkVec<CkMigratable *> list;
478   mgr->migratableList(rec, list);
479   CmiAssert(list.length() > 0);
480   for (int l=0; l<list.length(); l++) {
481     elt = (ArrayElement *)list[l];
482     elt->budPEs[0] = m->bud1;
483     elt->budPEs[1] = m->bud2;
484     //    reset, may not needed now
485     // for now.
486     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
487       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
488       if (c) c->redNo = 0;
489     }
490   }
491 #endif
492 }
493
494 // return 1 if pe is a crashed processor
495 int CkMemCheckPT::isFailed(int pe)
496 {
497   for (int i=0; i<failedPes.length(); i++)
498     if (failedPes[i] == pe) return 1;
499   return 0;
500 }
501
502 // add pe into history list of all failed processors
503 void CkMemCheckPT::failed(int pe)
504 {
505   if (isFailed(pe)) return;
506   failedPes.push_back(pe);
507 }
508
509 int CkMemCheckPT::totalFailed()
510 {
511   return failedPes.length();
512 }
513
514 // create an checkpoint entry for array element of aid with index.
515 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
516 {
517   // error check, no duplicate
518   int idx, len = ckTable.size();
519   for (idx=0; idx<len; idx++) {
520     CkCheckPTInfo *entry = ckTable[idx];
521     if (index == entry->index) {
522       if (loc == entry->locMgr) {
523         // bindTo array elements
524         return;
525       }
526       // for array inheritance, the following check may fail
527       // because ArrayElement constructor of all superclasses are called
528       if (aid == entry->aid) {
529         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
530         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
531       }
532     }
533   }
534   CkCheckPTInfo *newEntry;
535   if (where == CkCheckPoint_inMEM)
536     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
537   else
538     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
539   ckTable.push_back(newEntry);
540   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
541 }
542
543 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
544 {
545 #if !CMK_CHKP_ALL       
546   int buddy = msg->bud1;
547   if (buddy == CkMyPe()) buddy = msg->bud2;
548   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
549   recvData(msg);
550   // ack
551   thisProxy[buddy].gotData();
552 #else
553   chkpTable[0] = NULL;
554   chkpTable[1] = NULL;
555   recvArrayCheckpoint(msg);
556   thisProxy[msg->bud2].gotData();
557 #endif
558 }
559
560 // loop through my checkpoint table and ask checkpointed array elements
561 // to send me checkpoint data.
562 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
563 {
564   checkpointed = 1;
565   cpCallback = cb;
566   cpStarter = starter;
567   inCheckpointing = 1;
568   if (CkMyPe() == cpStarter) {
569     startTime = CmiWallTimer();
570     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
571   }
572 #if CMK_CONVERSE_MPI
573   if(CmiNumPartition()==1)
574 #endif    
575   {
576
577 #if !CMK_CHKP_ALL
578     int len = ckTable.length();
579     for (int i=0; i<len; i++) {
580       CkCheckPTInfo *entry = ckTable[i];
581       // always let the bigger number processor send request
582       //if (CkMyPe() < entry->pNo) continue;
583       // always let the smaller number processor send request, may on same proc
584       if (!isMaster(entry->pNo)) continue;
585       // call inmem_checkpoint to the array element, ask it to send
586       // back checkpoint data via recvData().
587       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
588       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
589     }
590     // if my table is empty, then I am done
591     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
592 #else
593     startArrayCheckpoint();
594 #endif
595     sendProcData();
596   }
597 #if CMK_CONVERSE_MPI
598   else
599   {
600     startCheckpoint();
601   }
602 #endif
603   // pack and send proc level data
604 }
605
606 class MemElementPacker : public CkLocIterator{
607   private:
608     CkLocMgr *locMgr;
609     PUP::er &p;
610   public:
611     MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
612     void addLocation(CkLocation &loc){
613       CkArrayIndexMax idx = loc.getIndex();
614       CkGroupID gID = locMgr->ckGetGroupID();
615       p|gID;
616       p|idx;
617       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
618     }
619 };
620
621 void pupAllElements(PUP::er &p){
622 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
623   int numElements;
624   if(!p.isUnpacking()){
625     numElements = CkCountArrayElements();
626   }
627   p | numElements;
628   if(!p.isUnpacking()){
629     CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
630   }
631 #endif
632 }
633
634 void CkMemCheckPT::startArrayCheckpoint(){
635 #if CMK_CHKP_ALL
636   int size;
637   {
638     PUP::sizer psizer;
639     pupAllElements(psizer);
640     size = psizer.size();
641   }
642   int packSize = size/sizeof(double)+1;
643   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
644   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
645   msg->len = size;
646   msg->cp_flag = 1;
647   int budPEs[2];
648   msg->bud1=CkMyPe();
649   msg->bud2=ChkptOnPe(CkMyPe());
650   {
651     PUP::toMem p(msg->packData);
652     pupAllElements(p);
653   }
654   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
655   if(chkpTable[0]) delete chkpTable[0];
656   chkpTable[0] = msg;
657   //send the checkpoint to my 
658   recvCount++;
659 #endif
660 }
661
662 void CkMemCheckPT::startCheckpoint(){
663 #if CMK_CONVERSE_MPI
664   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
665     CkPrintf("in start checkpointing!!!!\n");
666   int size;
667   {
668     PUP::sizer p;
669     _handleProcData(p,CmiFalse);
670     size = p.size();
671   }
672   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
673   procMsg->len = size;
674   procMsg->cp_flag = 1;
675   {
676     PUP::toMem p(procMsg->packData);
677     _handleProcData(p,CmiFalse);
678   }
679   int pointer = CpvAccess(curPointer);
680   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
681   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
682
683   {
684     PUP::sizer psizer;
685     pupAllElements(psizer);
686     size = psizer.size();
687   }
688   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
689   msg->len = size;
690   msg->cp_flag = 1;
691   {
692     PUP::toMem p(msg->packData);
693     pupAllElements(p);
694   }
695   pointer = CpvAccess(curPointer);
696   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
697   CpvAccess(chkpBuf)[pointer] = msg;
698   if(CkMyPe()==0)
699     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
700   if(CkReplicaAlive()==1){
701     CpvAccess(recvdLocal) = 1;
702     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
703     CkPackMessage(&env);
704     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
705     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
706   }
707   if(CpvAccess(recvdRemote)==1){
708     //compare the checkpoint 
709     int size = CpvAccess(chkpBuf)[pointer]->len;
710     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
711       thisProxy[CkMyPe()].doneComparison(true);
712     }else{
713       CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
714       thisProxy[CkMyPe()].doneComparison(false);
715     }
716     if(CkMyPe()==0)
717       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
718   }
719   else{
720     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
721       { 
722         int pointer = CpvAccess(curPointer);
723         //send the proc data
724         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
725         procMsg->pointer = pointer;
726         envelope * env = (envelope *)(UsrToEnv(procMsg));
727         CkPackMessage(&env);
728         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
729         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
730         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
731           CkPrintf("[%d] sendProcdata\n",CkMyPe());
732         }
733       }
734       //send the array checkpoint data
735       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
736       msg->pointer = CpvAccess(curPointer);
737       envelope * env = (envelope *)(UsrToEnv(msg));
738       CkPackMessage(&env);
739       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
740       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
741       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
742         CkPrintf("[%d] sendArraydata\n",CkMyPe());
743       //can continue work, no need to wait for my replica
744     }
745   }
746 #endif
747 }
748
749 void CkMemCheckPT::doneComparison(bool ret){
750   int _ret = 1;
751   if(!ret){
752     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
753     _ret = 0;
754   }else{
755     _ret = 1;
756   }
757   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
758   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
759 }
760
761 void CkMemCheckPT::doneRComparison(int ret){
762   //    if(CpvAccess(curPointer) == 0){
763   if(ret==CkNumPes()){
764     CpvAccess(localChkpDone) = 1;
765     if(CpvAccess(remoteChkpDone) ==1){
766       thisProxy.resumeFromChkp();
767     }
768     //notify the replica am done
769     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
770     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
771     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
772   }
773   else{
774     CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
775     thisProxy.RollBack();
776   }
777 }
778
779 void CkMemCheckPT::resumeFromChkp(){
780   CpvAccess(recvdRemote) = 0;
781   CpvAccess(recvdLocal) = 0;
782   CpvAccess(localChkpDone) = 0;
783   CpvAccess(remoteChkpDone) = 0;
784   CpvAccess(curPointer)^=1;
785   inCheckpointing = 0;
786   if(CkMyPe() == 0){
787     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
788   }
789   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
790 }
791
792 void CkMemCheckPT::RollBack(){
793   //restore group data
794   checkpointed = 0;
795   CkMemCheckPT::inRestarting = 1;
796   int pointer = CpvAccess(curPointer)^1;//use the previous one
797   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
798   PUP::fromMem p(chkpMsg->packData);    
799
800   //destroy array elements
801   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
802   int numGroups = CkpvAccess(_groupIDTable)->size();
803   for(int i=0;i<numGroups;i++) {
804     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
805     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
806     obj->flushStates();
807     obj->ckJustMigrated();
808   }
809   //restore array elements
810
811   int numElements;
812   p|numElements;
813
814   if(p.isUnpacking()){
815     for(int i=0;i<numElements;i++){
816       //for(int i=0;i<1;i++){
817       CkGroupID gID;
818       CkArrayIndex idx;
819       p|gID;
820       p|idx;
821       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
822       CmiAssert(mgr);
823       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
824     }
825     }
826     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
827     contribute(cb);
828   }
829
830   void CkMemCheckPT::notifyReplicaDie(int pe){
831     //CkPrintf("[%d] receive replica die\n",CkMyPe());
832     replicaAlive = 0;
833     CpvAccess(_remoteCrashedNode) = pe;
834   }
835
836   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
837   {
838 #if CMK_CHKP_ALL
839     int idx = 1;
840     if(msg->bud1 == CkMyPe()){
841       idx = 0;
842     }
843     int isChkpting = msg->cp_flag;
844     if(isChkpting == 1){
845       if(chkpTable[idx]) delete chkpTable[idx];
846     }
847     chkpTable[idx] = msg;
848     if(isChkpting){
849       recvCount++;
850       if(recvCount == 2){
851         if (where == CkCheckPoint_inMEM) {
852           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
853         }
854         else if (where == CkCheckPoint_inDISK) {
855           // another barrier for finalize the writing using fsync
856           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
857           contribute(0,NULL,CkReduction::sum_int,localcb);
858         }
859         else
860           CmiAbort("Unknown checkpoint scheme");
861         recvCount = 0;
862       }
863     }
864 #endif
865   }
866
867   // don't handle array elements
868   static inline void _handleProcData(PUP::er &p, CmiBool create)
869   {
870     // save readonlys, and callback BTW
871     CkPupROData(p);
872
873     // save mainchares 
874     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
875
876 #ifndef CMK_CHARE_USE_PTR
877     // save non-migratable chare
878     CkPupChareData(p);
879 #endif
880
881     // save groups into Groups.dat
882     CkPupGroupData(p,create);
883
884     // save nodegroups into NodeGroups.dat
885     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
886   }
887
888   void CkMemCheckPT::sendProcData()
889   {
890     // find out size of buffer
891     int size;
892     {
893       PUP::sizer p;
894       _handleProcData(p,CmiTrue);
895       size = p.size();
896     }
897     int packSize = size;
898     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
899     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
900     {
901       PUP::toMem p(msg->packData);
902       _handleProcData(p,CmiTrue);
903     }
904     msg->pe = CkMyPe();
905     msg->len = size;
906     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
907     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
908   }
909
910   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
911   {
912     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
913     CpvAccess(procChkptBuf) = msg;
914     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
915     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
916   }
917
918   // ArrayElement call this function to give us the checkpointed data
919   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
920   {
921     int len = ckTable.length();
922     int idx;
923     for (idx=0; idx<len; idx++) {
924       CkCheckPTInfo *entry = ckTable[idx];
925       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
926     }
927     CkAssert(idx < len);
928     int isChkpting = msg->cp_flag;
929     ckTable[idx]->updateBuffer(msg);
930     if (isChkpting) {
931       // all my array elements have returned their inmem data
932       // inform starter processor that I am done.
933       recvCount ++;
934       if (recvCount == ckTable.length()) {
935         if (where == CkCheckPoint_inMEM) {
936           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
937         }
938         else if (where == CkCheckPoint_inDISK) {
939           // another barrier for finalize the writing using fsync
940           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
941           contribute(0,NULL,CkReduction::sum_int,localcb);
942         }
943         else
944           CmiAbort("Unknown checkpoint scheme");
945         recvCount = 0;
946       } 
947     }
948   }
949
950   // only used in disk checkpointing
951   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
952   {
953     delete m;
954 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
955     system("sync");
956 #endif
957     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
958   }
959
960   // only is called on cpStarter when checkpoint is done
961   void CkMemCheckPT::cpFinish()
962   {
963     CmiAssert(CkMyPe() == cpStarter);
964     peCount++;
965     // now that all processors have finished, activate callback
966     if (peCount == 2) 
967     {
968       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
969       peCount = 0;
970       thisProxy.report();
971     }
972   }
973
974   // for debugging, report checkpoint info
975   void CkMemCheckPT::report()
976   {
977     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
978     inCheckpointing = 0;
979 #if !CMK_CHKP_ALL
980     int objsize = 0;
981     int len = ckTable.length();
982     for (int i=0; i<len; i++) {
983       CkCheckPTInfo *entry = ckTable[i];
984       CmiAssert(entry);
985       objsize += entry->getSize();
986     }
987     CmiAssert(CpvAccess(procChkptBuf));
988     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
989 #else
990     if(CkMyPe()==0)
991       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
992 #endif
993   }
994
995   /*****************************************************************************
996     RESTART Procedure
997    *****************************************************************************/
998
999   // master processor of two buddies
1000   inline int CkMemCheckPT::isMaster(int buddype)
1001   {
1002 #if 0
1003     int mype = CkMyPe();
1004     //CkPrintf("ismaster: %d %d\n", pe, mype);
1005     if (CkNumPes() - totalFailed() == 2) {
1006       return mype > buddype;
1007     }
1008     for (int i=1; i<CkNumPes(); i++) {
1009       int me = (buddype+i)%CkNumPes();
1010       if (isFailed(me)) continue;
1011       if (me == mype) return 1;
1012       else return 0;
1013     }
1014     return 0;
1015 #else
1016     // smaller one
1017     int mype = CkMyPe();
1018     //CkPrintf("ismaster: %d %d\n", pe, mype);
1019     if (CkNumPes() - totalFailed() == 2) {
1020       return mype < buddype;
1021     }
1022 #if NODE_CHECKPOINT
1023     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1024     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1025 #else
1026       for (int i=1; i<CkNumPes(); i++) {
1027 #endif
1028         int me = (mype+i)%CkNumPes();
1029         if (isFailed(me)) continue;
1030         if (me == buddype) return 1;
1031         else return 0;
1032       }
1033       return 0;
1034 #endif
1035     }
1036
1037
1038
1039 #if 0
1040     // helper class to pup all elements that belong to same ckLocMgr
1041     class ElementDestoryer : public CkLocIterator {
1042       private:
1043         CkLocMgr *locMgr;
1044       public:
1045         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1046         void addLocation(CkLocation &loc) {
1047           CkArrayIndex idx=loc.getIndex();
1048           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1049           loc.destroy();
1050         }
1051     };
1052 #endif
1053
1054     // restore the bitmap vector for LB
1055     void CkMemCheckPT::resetLB(int diepe)
1056     {
1057 #if CMK_LBDB_ON
1058       int i;
1059       char *bitmap = new char[CkNumPes()];
1060       // set processor available bitmap
1061       get_avail_vector(bitmap);
1062       for (i=0; i<failedPes.length(); i++)
1063         bitmap[failedPes[i]] = 0; 
1064       bitmap[diepe] = 0;
1065
1066 #if CK_NO_PROC_POOL
1067       set_avail_vector(bitmap);
1068 #endif
1069
1070       // if I am the crashed pe, rebuild my failedPEs array
1071       if (CkMyNode() == diepe)
1072         for (i=0; i<CkNumPes(); i++) 
1073           if (bitmap[i]==0) failed(i);
1074
1075       delete [] bitmap;
1076 #endif
1077     }
1078
1079     static double restartT;
1080
1081     // in case when failedPe dies, everybody go through its checkpoint table:
1082     // destory all array elements
1083     // recover lost buddies
1084     // reconstruct all array elements from check point data
1085     // called on all processors
1086     void CkMemCheckPT::restart(int diePe)
1087     {
1088 #if CMK_MEM_CHECKPOINT
1089       double curTime = CmiWallTimer();
1090       if (CkMyPe() == diePe){
1091         restartT = CmiWallTimer();
1092         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1093       }
1094       stage = (char*)"resetLB";
1095       startTime = curTime;
1096       if (CkMyPe() == diePe)
1097         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1098
1099 #if CK_NO_PROC_POOL
1100       failed(diePe);    // add into the list of failed pes
1101 #endif
1102       thisFailedPe = diePe;
1103
1104       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1105
1106       inRestarting = 1;
1107
1108       // disable load balancer's barrier
1109       //if (CkMyPe() != diePe) resetLB(diePe);
1110
1111       CKLOCMGR_LOOP(mgr->startInserting(););
1112
1113
1114       if(CmiNumPartition()==1){
1115         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1116       }else{
1117         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1118         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1119       }
1120 #endif
1121     }
1122
1123     // loally remove all array elements
1124     void CkMemCheckPT::removeArrayElements()
1125     {
1126 #if CMK_MEM_CHECKPOINT
1127       int len = ckTable.length();
1128       double curTime = CmiWallTimer();
1129       if (CkMyPe() == thisFailedPe) 
1130         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1131       stage = (char*)"removeArrayElements";
1132       startTime = curTime;
1133
1134       //  if (cpCallback.isInvalid()) 
1135       //          CkPrintf("invalid pe %d\n",CkMyPe());
1136       //          CkAbort("Didn't set restart callback\n");;
1137       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1138
1139       // get rid of all buffering and remote recs
1140       // including destorying all array elements
1141 #if CK_NO_PROC_POOL  
1142       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1143 #else
1144       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1145 #endif
1146       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1147 #endif
1148     }
1149
1150     // flush state in reduction manager
1151     void CkMemCheckPT::resetReductionMgr()
1152     {
1153       if (CkMyPe() == thisFailedPe) 
1154         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1155       int numGroups = CkpvAccess(_groupIDTable)->size();
1156       for(int i=0;i<numGroups;i++) {
1157         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1158         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1159         obj->flushStates();
1160         obj->ckJustMigrated();
1161       }
1162       // reset again
1163       //CpvAccess(_qd)->flushStates();
1164       if(CmiNumPartition()==1){
1165         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1166       }
1167       else
1168         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1169     }
1170
1171     // recover the lost buddies
1172     void CkMemCheckPT::recoverBuddies()
1173     {
1174       int idx;
1175       int len = ckTable.length();
1176       // ready to flush reduction manager
1177       // cannot be CkMemCheckPT::restart because destory will modify states
1178       double curTime = CmiWallTimer();
1179       if (CkMyPe() == thisFailedPe)
1180         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1181       stage = (char *)"recoverBuddies";
1182       if (CkMyPe() == thisFailedPe)
1183         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1184       startTime = curTime;
1185
1186       // recover buddies
1187       expectCount = 0;
1188 #if !CMK_CHKP_ALL
1189       for (idx=0; idx<len; idx++) {
1190         CkCheckPTInfo *entry = ckTable[idx];
1191         if (entry->pNo == thisFailedPe) {
1192 #if CK_NO_PROC_POOL
1193           // find a new buddy
1194           int budPe = BuddyPE(CkMyPe());
1195 #else
1196           int budPe = thisFailedPe;
1197 #endif
1198           CkArrayCheckPTMessage *msg = entry->getCopy();
1199           msg->bud1 = budPe;
1200           msg->bud2 = CkMyPe();
1201           msg->cp_flag = 0;            // not checkpointing
1202           thisProxy[budPe].recoverEntry(msg);
1203           expectCount ++;
1204         }
1205       }
1206 #else
1207       //send to failed pe
1208       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1209 #if CK_NO_PROC_POOL
1210         // find a new buddy
1211         int budPe = BuddyPE(CkMyPe());
1212 #else
1213         int budPe = thisFailedPe;
1214 #endif
1215         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1216         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1217         msg->cp_flag = 0;            // not checkpointing
1218         msg->bud1 = budPe;
1219         msg->bud2 = CkMyPe();
1220         thisProxy[budPe].recoverEntry(msg);
1221         expectCount ++;
1222       }
1223 #endif
1224
1225       if (expectCount == 0) {
1226         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1227       }
1228       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1229     }
1230
1231     void CkMemCheckPT::gotData()
1232     {
1233       ackCount ++;
1234       if (ackCount == expectCount) {
1235         ackCount = 0;
1236         expectCount = -1;
1237         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1238         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1239       }
1240     }
1241
1242     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1243     {
1244
1245       for (int i=0; i<n; i++) {
1246         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1247         mgr->updateLocation(idx[i], nowOnPe);
1248       }
1249       thisProxy[nowOnPe].gotReply();
1250     }
1251
1252     // restore array elements
1253     void CkMemCheckPT::recoverArrayElements()
1254     {
1255       if(CmiMyPartition()==1&&CkMyPe()==0){
1256         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1257         fflush(stdout);
1258       }
1259       double curTime = CmiWallTimer();
1260       int len = ckTable.length();
1261       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1262       stage = (char *)"recoverArrayElements";
1263       if (CkMyPe() == thisFailedPe)
1264         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1265       startTime = curTime;
1266       int flag = 0;
1267       // recover all array elements
1268       int count = 0;
1269
1270 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1271       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1272       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1273 #endif
1274
1275 #if !CMK_CHKP_ALL
1276       for (int idx=0; idx<len; idx++)
1277       {
1278         CkCheckPTInfo *entry = ckTable[idx];
1279 #if CK_NO_PROC_POOL
1280         // the bigger one will do 
1281         //    if (CkMyPe() < entry->pNo) continue;
1282         if (!isMaster(entry->pNo)) continue;
1283 #else
1284         // smaller one do it, which has the original object
1285         if (CkMyPe() == entry->pNo+1 || 
1286             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1287 #endif
1288         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1289
1290         entry->updateBuddy(CkMyPe(), entry->pNo);
1291         CkArrayCheckPTMessage *msg = entry->getCopy();
1292         // gzheng
1293         //thisProxy[CkMyPe()].inmem_restore(msg);
1294         inmem_restore(msg);
1295 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1296         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1297         int homePe = mgr->homePe(msg->index);
1298         if (homePe != CkMyPe()) {
1299           gmap[homePe].push_back(msg->locMgr);
1300           imap[homePe].push_back(msg->index);
1301         }
1302 #endif
1303         CkFreeMsg(msg);
1304         count ++;
1305       }
1306 #else
1307       char * packData;
1308       if(CmiNumPartition()==1){
1309         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1310         packData = (char *)msg->packData;
1311       }
1312       else{
1313         int pointer = CpvAccess(curPointer);
1314         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1315         packData = msg->packData;
1316       }
1317 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1318       recoverAll(packData,gmap,imap);
1319 #else
1320       recoverAll(packData);
1321 #endif
1322 #endif
1323       curTime = CmiWallTimer();
1324 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1325       for (int i=0; i<CkNumPes(); i++) {
1326         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1327           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1328           flag++;       
1329         }
1330       }
1331       delete [] imap;
1332       delete [] gmap;
1333 #endif
1334       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1335
1336       CKLOCMGR_LOOP(mgr->doneInserting(););
1337
1338       // _crashedNode = -1;
1339       CpvAccess(_crashedNode) = -1;
1340 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1341       if (CkMyPe() == 0)
1342         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1343 #else
1344       if(flag == 0)
1345       {
1346         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1347       }
1348 #endif
1349       if(CmiMyPartition()==1&&CkMyPe()==0){
1350         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1351         fflush(stdout);
1352       }
1353     }
1354
1355     void CkMemCheckPT::gotReply(){
1356       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1357     }
1358
1359     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1360       if(CmiMyPartition()==1&&CkMyPe()==0){
1361         CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1362         fflush(stdout);
1363       }
1364 #if CMK_CHKP_ALL
1365       PUP::fromMem p(packData);
1366       int numElements;
1367       p|numElements;
1368       if(p.isUnpacking()){
1369         for(int i=0;i<numElements;i++){
1370           CkGroupID gID;
1371           CkArrayIndex idx;
1372           p|gID;
1373           p|idx;
1374           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1375           int homePe = mgr->homePe(idx);
1376 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1377           mgr->resume(idx,p,CmiTrue,CmiTrue);
1378 #else
1379           if(CmiNumPartition()==1)
1380             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1381           else{
1382             if(CkMyPe()==thisFailedPe){
1383               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1384             }
1385             else{
1386               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1387             }
1388           }
1389 #endif
1390 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1391           homePe = mgr->homePe(idx);
1392           if (homePe != CkMyPe()) {
1393             gmap[homePe].push_back(gID);
1394             imap[homePe].push_back(idx);
1395           }
1396 #endif
1397         }
1398       }
1399 #endif
1400       if(CmiMyPartition()==1&&CkMyPe()==0){
1401         CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1402         fflush(stdout);
1403       }
1404     }
1405
1406
1407     // on every processor
1408     // turn load balancer back on
1409     void CkMemCheckPT::finishUp()
1410     {
1411       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1412       //CKLOCMGR_LOOP(mgr->doneInserting(););
1413
1414       if (CkMyPe() == thisFailedPe)
1415       {
1416         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1417         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1418         fflush(stdout);
1419       }
1420       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1421       inRestarting = 0;
1422
1423 #if CMK_CONVERSE_MPI    
1424       if(CmiNumPartition()!=1){
1425         CpvAccess(recvdProcChkp) = 0;
1426         CpvAccess(recvdArrayChkp) = 0;
1427         CpvAccess(curPointer)^=1;
1428         //notify my replica, restart is done
1429         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1430         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1431         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1432       }
1433       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1434         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1435       }
1436 #endif
1437
1438 #if CK_NO_PROC_POOL
1439 #if NODE_CHECKPOINT
1440       int numnodes = CmiNumPhysicalNodes();
1441 #else
1442       int numnodes = CkNumPes();
1443 #endif
1444       if (numnodes-totalFailed() <=2) {
1445         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1446         _memChkptOn = 0;
1447       }
1448 #endif
1449     }
1450
1451     void CkMemCheckPT::recoverFromSoftFailure()
1452     {
1453       inRestarting = 0;
1454       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1455     }
1456     // called only on 0
1457     void CkMemCheckPT::quiescence(CkCallback &cb)
1458     {
1459       static int pe_count = 0;
1460       pe_count ++;
1461       CmiAssert(CkMyPe() == 0);
1462       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1463       if (pe_count == CkNumPes()) {
1464         pe_count = 0;
1465         cb.send();
1466       }
1467     }
1468
1469     // User callable function - to start a checkpoint
1470     // callback cb is used to pass control back
1471     void CkStartMemCheckpoint(CkCallback &cb)
1472     {
1473 #if CMK_MEM_CHECKPOINT
1474       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1475       if (_memChkptOn == 0) {
1476         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1477         cb.send();
1478         return;
1479       }
1480       if (CkInRestarting()) {
1481         // trying to checkpointing during restart
1482         cb.send();
1483         return;
1484       }
1485       // store user callback and user data
1486       CkMemCheckPT::cpCallback = cb;
1487
1488       // broadcast to start check pointing
1489       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1490       checkptMgr.doItNow(CkMyPe(), cb);
1491 #else
1492       // when mem checkpoint is disabled, invike cb immediately
1493       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1494       cb.send();
1495 #endif
1496     }
1497
1498     void CkRestartCheckPoint(int diePe)
1499     {
1500       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1501       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1502       // broadcast
1503       checkptMgr.restart(diePe);
1504     }
1505
1506     static int _diePE = -1;
1507
1508     // callback function used locally by ccs handler
1509     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1510     {
1511       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1512       CkRestartCheckPoint(_diePE);
1513     }
1514
1515
1516     // called on crashed PE
1517     static void restartBeginHandler(char *msg)
1518     {
1519       CmiFree(msg);
1520 #if CMK_MEM_CHECKPOINT
1521 #if CMK_USE_BARRIER
1522       if(CkMyPe()!=_diePE){
1523         printf("restar begin on %d\n",CkMyPe());
1524         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1525         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1526         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1527       }else{
1528         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1529         CkRestartCheckPointCallback(NULL, NULL);
1530       }
1531 #else
1532       static int count = 0;
1533       CmiAssert(CkMyPe() == _diePE);
1534       count ++;
1535       if (count == CkNumPes()) {
1536         printf("restart begin on %d\n",CkMyPe());
1537         CkRestartCheckPointCallback(NULL, NULL);
1538         count = 0;
1539       }
1540 #endif
1541 #endif
1542     }
1543
1544     extern void _discard_charm_message();
1545     extern void _resume_charm_message();
1546
1547     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1548       return data;
1549     }
1550
1551     static void restartBcastHandler(char *msg)
1552     {
1553 #if CMK_MEM_CHECKPOINT
1554       // advance phase counter
1555       CkMemCheckPT::inRestarting = 1;
1556       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1557       // gzheng
1558       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1559
1560       if (CkMyPe()==_diePE)
1561         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1562
1563       // reset QD counters
1564       /*  gzheng
1565           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1566        */
1567
1568       /*  gzheng
1569           if (CkMyPe()==_diePE)
1570           CkRestartCheckPointCallback(NULL, NULL);
1571        */
1572       CmiFree(msg);
1573
1574       _resume_charm_message();
1575
1576       // reduction
1577       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1578       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1579 #if CMK_USE_BARRIER
1580       //CmiPrintf("before reduce\n");   
1581       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1582       //CmiPrintf("after reduce\n");    
1583 #else
1584       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1585 #endif 
1586       checkpointed = 0;
1587 #endif
1588     }
1589
1590     extern void _initDone();
1591
1592     bool compare(char * buf1, char *buf2){
1593         PUP::checker pchecker(buf1,buf2);
1594                 pchecker.skip();
1595
1596                 int numElements;
1597                 pchecker|numElements;
1598                 for(int i=0;i<numElements;i++){
1599       //for(int i=0;i<1;i++){
1600       CkGroupID gID;
1601       CkArrayIndex idx;
1602
1603       pchecker|gID;
1604       pchecker|idx;
1605
1606       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1607       mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1608       }
1609       return pchecker.getResult();
1610       //return true;
1611     }
1612
1613     static void recvRemoteChkpHandler(char *msg){
1614       envelope *env = (envelope *)msg;
1615       CkUnpackMessage(&env);
1616       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1617       if(CpvAccess(recvdLocal)==1){
1618         int pointer = CpvAccess(curPointer);
1619         int size = CpvAccess(chkpBuf)[pointer]->len;
1620         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1621           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1622         }else
1623         {
1624           CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1625           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1626         }
1627         delete chkpMsg;
1628         if(CkMyPe()==0)
1629           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1630       }else{
1631         CpvAccess(recvdRemote) = 1;
1632         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1633         CpvAccess(buddyBuf) = chkpMsg;
1634       }  
1635     }
1636
1637     static void replicaRecoverHandler(char *msg){
1638       CpvAccess(_remoteCrashedNode) = -1;
1639       CkMemCheckPT::replicaAlive = 1;
1640       //fflush(stdout);
1641       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1642       bool ret = true;
1643       CpvAccess(remoteChkpDone) = 1;
1644       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1645       CmiFree(msg);
1646   
1647     }
1648     static void replicaChkpDoneHandler(char *msg){
1649       CpvAccess(remoteChkpDone) = 1;
1650       if(CpvAccess(localChkpDone) == 1)
1651         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1652       CmiFree(msg);
1653     }
1654
1655     static void replicaDieHandler(char * msg){
1656 #if CMK_CONVERSE_MPI    
1657       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1658       CpvAccess(_remoteCrashedNode) = diePe;
1659       CkMemCheckPT::replicaAlive = 0;
1660       if(CkMyPe()==diePe){
1661         CmiPrintf("pe %d in replicad word die\n",diePe);
1662         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1663         fflush(stdout);
1664       }
1665       find_spare_mpirank(diePe,CmiMyPartition()^1);
1666 #endif
1667       //broadcast to my partition to get local max iter
1668       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1669       CmiFree(msg);
1670     }
1671
1672
1673     static void replicaDieBcastHandler(char *msg){
1674       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1675       CpvAccess(_remoteCrashedNode) = diePe;
1676       CkMemCheckPT::replicaAlive = 0;
1677       CmiFree(msg);
1678     }
1679
1680     static void recoverRemoteProcDataHandler(char *msg){
1681       if(CmiMyPartition()==1&&CkMyPe()==0){
1682         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1683         fflush(stdout);
1684       }
1685       envelope *env = (envelope *)msg;
1686       CkUnpackMessage(&env);
1687       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1688
1689       //store the checkpoint
1690       int pointer = procMsg->pointer;
1691
1692
1693       if(CkMyPe()==CpvAccess(_crashedNode)){
1694         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1695         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1696         PUP::fromMem p(procMsg->packData);
1697         _handleProcData(p,CmiTrue);
1698         _initDone();
1699       }
1700       else{
1701         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1702         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1703         //_handleProcData(p,CmiFalse);
1704       }
1705       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1706       CKLOCMGR_LOOP(mgr->startInserting(););
1707
1708       CpvAccess(recvdProcChkp) =1;
1709       if(CpvAccess(recvdArrayChkp)==1){
1710         _resume_charm_message();
1711         _diePE = CpvAccess(_crashedNode);
1712         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1713         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1714         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1715       }
1716       if(CmiMyPartition()==1&&CkMyPe()==0){
1717         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1718         fflush(stdout);
1719       }
1720     }
1721
1722     static void recoverRemoteArrayDataHandler(char *msg){
1723       envelope *env = (envelope *)msg;
1724       CkUnpackMessage(&env);
1725       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1726       if(CmiMyPartition()==1&&CkMyPe()==0){
1727         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1728         fflush(stdout);
1729       }
1730
1731       //store the checkpoint
1732       int pointer = chkpMsg->pointer;
1733       CpvAccess(curPointer) = pointer;
1734       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1735       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1736       CpvAccess(recvdArrayChkp) =1;
1737       CkMemCheckPT::inRestarting = 1;
1738       if(CpvAccess(recvdProcChkp) == 1){
1739         _resume_charm_message();
1740         _diePE = CpvAccess(_crashedNode);
1741         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1742         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1743         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1744         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1745       }
1746       if(CmiMyPartition()==1&&CkMyPe()==0){
1747         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1748         fflush(stdout);
1749       }
1750     }
1751
1752     static void recvPhaseHandler(char * msg)
1753     {
1754       CpvAccess(_curRestartPhase)--;
1755       CkMemCheckPT::inRestarting = 1;
1756       CmiFree(msg);
1757     }
1758     // called on crashed processor
1759     static void recoverProcDataHandler(char *msg)
1760     {
1761 #if CMK_MEM_CHECKPOINT
1762       int i;
1763       envelope *env = (envelope *)msg;
1764       CkUnpackMessage(&env);
1765       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1766       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1767       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1768       PUP::fromMem p(procMsg->packData);
1769       _handleProcData(p,CmiTrue);
1770
1771       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1772       // gzheng
1773       CKLOCMGR_LOOP(mgr->startInserting(););
1774
1775       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1776       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1777       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1778       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1779
1780       _initDone();
1781       //   CpvAccess(_qd)->flushStates();
1782       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1783 #endif
1784     }
1785     //for replica, got the phase number from my neighbor processor in the current partition
1786     static void askPhaseHandler(char *msg)
1787     {
1788 #if CMK_MEM_CHECKPOINT
1789       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1790       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1791       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1792       CmiSetHandler(msg, recvPhaseHandlerIdx);
1793       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1794 #endif
1795     }
1796     // called on its backup processor
1797     // get backup message buffer and sent to crashed processor
1798     static void askProcDataHandler(char *msg)
1799     {
1800 #if CMK_MEM_CHECKPOINT
1801       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1802       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1803       if (CpvAccess(procChkptBuf) == NULL)  {
1804         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1805         CkAbort("no checkpoint found");
1806       }
1807       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1808       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1809
1810       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1811
1812       CkPackMessage(&env);
1813       CmiSetHandler(env, recoverProcDataHandlerIdx);
1814       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1815       CpvAccess(procChkptBuf) = NULL;
1816       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1817 #endif
1818     }
1819
1820     // called on PE 0
1821     void qd_callback(void *m)
1822     {
1823       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1824         fflush(stdout);
1825       CkFreeMsg(m);
1826       if(CmiNumPartition()==1){
1827 #ifdef CMK_SMP
1828         for(int i=0;i<CmiMyNodeSize();i++){
1829           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1830           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1831           CmiSetHandler(msg, askProcDataHandlerIdx);
1832           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1833           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1834         }
1835         return;
1836 #endif
1837         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1838         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1839         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1840         CmiSetHandler(msg, askProcDataHandlerIdx);
1841         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1842         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1843       }
1844       else{
1845         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1846         CmiSetHandler(msg, recvPhaseHandlerIdx);
1847         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1848       }
1849     }
1850
1851     // on crashed node
1852     void CkMemRestart(const char *dummy, CkArgMsg *args)
1853     {
1854 #if CMK_MEM_CHECKPOINT
1855       _diePE = CmiMyNode();
1856       CpvAccess( _crashedNode )= CmiMyNode();
1857       CkMemCheckPT::inRestarting = 1;
1858       _discard_charm_message();
1859
1860       /*if(CmiMyRank()==0){
1861         CkCallback cb(qd_callback);
1862         CkStartQD(cb);
1863         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1864         }*/
1865       if(CmiNumPartition()==1){
1866         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1867         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1868         restartT = CmiWallTimer();
1869         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1870         fflush(stdout);
1871         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1872         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1873         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1874         CmiSetHandler(msg, askProcDataHandlerIdx);
1875         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1876         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1877       }
1878       else{
1879         CkCallback cb(qd_callback);
1880         CkStartQD(cb);
1881       }
1882 #else
1883       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1884 #endif
1885     }
1886
1887     // can be called in other files
1888     // return true if it is in restarting
1889     extern "C"
1890       int CkInRestarting()
1891       {
1892 #if CMK_MEM_CHECKPOINT
1893         if (CpvAccess( _crashedNode)!=-1) return 1;
1894         // gzheng
1895         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1896         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1897         return CkMemCheckPT::inRestarting;
1898 #else
1899         return 0;
1900 #endif
1901       }
1902
1903     extern "C"
1904       int CkReplicaAlive()
1905       {
1906         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1907         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1908         return CkMemCheckPT::replicaAlive;
1909
1910         /*if(CkMemCheckPT::replicaDead==1)
1911           return 0;
1912           else          
1913           return 1;*/
1914       }
1915
1916     extern "C"
1917       int CkInCheckpointing()
1918       {
1919         return CkMemCheckPT::inCheckpointing;
1920       }
1921
1922     extern "C"
1923       void CkSetInLdb(){
1924 #if CMK_MEM_CHECKPOINT
1925         CkMemCheckPT::inLoadbalancing = 1;
1926 #endif
1927       }
1928
1929     extern "C"
1930       int CkInLdb(){
1931 #if CMK_MEM_CHECKPOINT
1932         return CkMemCheckPT::inLoadbalancing;
1933 #endif
1934         return 0;
1935       }
1936
1937     extern "C"
1938       void CkResetInLdb(){
1939 #if CMK_MEM_CHECKPOINT
1940         CkMemCheckPT::inLoadbalancing = 0;
1941 #endif
1942       }
1943
1944     /*****************************************************************************
1945       module initialization
1946      *****************************************************************************/
1947
1948     static int arg_where = CkCheckPoint_inMEM;
1949
1950 #if CMK_MEM_CHECKPOINT
1951     void init_memcheckpt(char **argv)
1952     {
1953       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1954         arg_where = CkCheckPoint_inDISK;
1955       }
1956       // initiliazing _crashedNode variable
1957       CpvInitialize(int, _crashedNode);
1958       CpvInitialize(int, _remoteCrashedNode);
1959       CpvAccess(_crashedNode) = -1;
1960       CpvAccess(_remoteCrashedNode) = -1;
1961       init_FI(argv);
1962
1963     }
1964 #endif
1965
1966     class CkMemCheckPTInit: public Chare {
1967       public:
1968         CkMemCheckPTInit(CkArgMsg *m) {
1969 #if CMK_MEM_CHECKPOINT
1970           if (arg_where == CkCheckPoint_inDISK) {
1971             CkPrintf("Charm++> Double-disk Checkpointing. \n");
1972           }
1973           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1974           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1975 #endif
1976         }
1977     };
1978
1979     static void notifyHandler(char *msg)
1980     {
1981 #if CMK_MEM_CHECKPOINT
1982       CmiFree(msg);
1983       /* immediately increase restart phase to filter old messages */
1984       CpvAccess(_curRestartPhase) ++;
1985       CpvAccess(_qd)->flushStates();
1986       _discard_charm_message();
1987
1988 #endif
1989     }
1990
1991     extern "C"
1992       void notify_crash(int node)
1993       {
1994 #ifdef CMK_MEM_CHECKPOINT
1995         CpvAccess( _crashedNode) = node;
1996 #ifdef CMK_SMP
1997         for(int i=0;i<CkMyNodeSize();i++){
1998           CpvAccessOther(_crashedNode,i)=node;
1999         }
2000 #endif
2001         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2002         CkMemCheckPT::inRestarting = 1;
2003
2004         // this may be in interrupt handler, send a message to reset QD
2005         int pe = CmiNodeFirst(CkMyNode());
2006         for(int i=0;i<CkMyNodeSize();i++){
2007           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2008           CmiSetHandler(msg, notifyHandlerIdx);
2009           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2010         }
2011 #endif
2012       }
2013
2014     extern "C" void (*notify_crash_fn)(int node);
2015
2016
2017 #if CMK_CONVERSE_MPI
2018     void buddyDieHandler(char *msg)
2019     {
2020 #if CMK_MEM_CHECKPOINT
2021       // notify
2022       CkMemCheckPT::inRestarting = 1;
2023       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2024       notify_crash(diepe);
2025       // send message to crash pe to let it restart
2026       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2027       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2028       int buddy = obj->BuddyPE(CmiMyPe());
2029       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2030       if (buddy == diepe)  {
2031         mpi_restart_crashed(diepe, newrank);
2032       }
2033 #endif
2034     }
2035
2036     void pingHandler(void *msg)
2037     {
2038       lastPingTime = CmiWallTimer();
2039       CmiFree(msg);
2040     }
2041
2042     void pingCheckHandler()
2043     {
2044 #if CMK_MEM_CHECKPOINT
2045       double now = CmiWallTimer();
2046       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2047         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2048         int i, pe, buddy;
2049         // tell everyone the buddy dies
2050         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2051         for (i = 1; i < CmiNumPes(); i++) {
2052           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2053           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2054         }
2055         buddy = pe;
2056         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2057         fflush(stdout);
2058         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2059         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2060         CmiSetHandler(msg, buddyDieHandlerIdx);
2061         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2062         //send to everyone in the other world
2063         if(CmiNumPartition()!=1){
2064           for(int i=0;i<CmiNumPes();i++){
2065             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2066             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2067             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2068             //fflush(stdout);
2069             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2070             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2071           }
2072         }
2073       }
2074         else 
2075           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2076 #endif
2077       }
2078
2079       void pingBuddy()
2080       {
2081 #if CMK_MEM_CHECKPOINT
2082         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2083         if (obj) {
2084           int buddy = obj->BuddyPE(CkMyPe());
2085           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2086           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2087           CmiSetHandler(msg, pingHandlerIdx);
2088           CmiGetRestartPhase(msg) = 9999;
2089           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2090         }
2091         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2092 #endif
2093       }
2094 #endif
2095
2096       // initproc
2097       void CkRegisterRestartHandler( )
2098       {
2099 #if CMK_MEM_CHECKPOINT
2100         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2101         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2102         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2103         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2104         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2105
2106         //for replica
2107         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2108         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2109         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2110         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2111         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2112         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2113         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2114         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2115         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2116
2117 #if CMK_CONVERSE_MPI
2118         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2119         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2120         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2121 #endif
2122
2123         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2124         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2125         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2126         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2127         CpvInitialize(int,curPointer);
2128         CpvInitialize(int,recvdLocal);
2129         CpvInitialize(int,localChkpDone);
2130         CpvInitialize(int,remoteChkpDone);
2131         CpvInitialize(int,recvdRemote);
2132         CpvInitialize(int,recvdProcChkp);
2133         CpvInitialize(int,recvdArrayChkp);
2134
2135         CpvAccess(procChkptBuf) = NULL;
2136         CpvAccess(buddyBuf) = NULL;
2137         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2138         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2139         CpvAccess(chkpBuf)[0] = NULL;
2140         CpvAccess(chkpBuf)[1] = NULL;
2141         CpvAccess(localProcChkpBuf)[0] = NULL;
2142         CpvAccess(localProcChkpBuf)[1] = NULL;
2143
2144         CpvAccess(curPointer) = 0;
2145         CpvAccess(recvdLocal) = 0;
2146         CpvAccess(localChkpDone) = 0;
2147         CpvAccess(remoteChkpDone) = 0;
2148         CpvAccess(recvdRemote) = 0;
2149         CpvAccess(recvdProcChkp) = 0;
2150         CpvAccess(recvdArrayChkp) = 0;
2151
2152         notify_crash_fn = notify_crash;
2153
2154 #if ! CMK_CONVERSE_MPI
2155         // print pid to kill
2156         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2157         //  sleep(4);
2158 #endif
2159 #endif
2160       }
2161
2162
2163       extern "C"
2164         int CkHasCheckpoints()
2165         {
2166           return checkpointed;
2167         }
2168
2169       /// @todo: the following definitions should be moved to a separate file containing
2170       // structures and functions about fault tolerance strategies
2171
2172       /**
2173        *  * @brief: function for killing a process                                             
2174        *   */
2175 #ifdef CMK_MEM_CHECKPOINT
2176 #if CMK_HAS_GETPID
2177       void killLocal(void *_dummy,double curWallTime){
2178         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2179         if(CmiWallTimer()<killTime-1){
2180           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2181         }else{ 
2182 #if CMK_CONVERSE_MPI
2183           CkDieNow();
2184 #else 
2185           kill(getpid(),SIGKILL);                                               
2186 #endif
2187         }              
2188       } 
2189 #else
2190       void killLocal(void *_dummy,double curWallTime){
2191         CmiAbort("kill() not supported!");
2192       }
2193 #endif
2194 #endif
2195
2196 #ifdef CMK_MEM_CHECKPOINT
2197       /**
2198        * @brief: reads the file with the kill information
2199        */
2200       void readKillFile(){
2201         FILE *fp=fopen(killFile,"r");
2202         if(!fp){
2203           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2204           return;
2205         }
2206         int proc;
2207         double sec;
2208         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2209           if(proc == CkMyNode() && CkMyRank() == 0){
2210             killTime = CmiWallTimer()+sec;
2211             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2212             CcdCallFnAfter(killLocal,NULL,sec*1000);
2213           }
2214         }
2215         fclose(fp);
2216       }
2217
2218 #if ! CMK_CONVERSE_MPI
2219       void CkDieNow()
2220       {
2221 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2222         // ignored for non-mpi version
2223         CmiPrintf("[%d] die now.\n", CmiMyPe());
2224         killTime = CmiWallTimer()+0.001;
2225         CcdCallFnAfter(killLocal,NULL,1);
2226 #endif
2227       }
2228 #endif
2229
2230 #endif
2231
2232 #include "CkMemCheckpoint.def.h"
2233
2234