small change in the previous commit
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92 unsigned int failureSeed;
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // flag for failure distributiona
104 char * failureDist;
105 // Meantimr between failure
106 int MTBF;
107 int SMTBF;
108 //shape parameter for Weibull distribution
109 double beta;
110 double alpha;
111 double s_alpha;
112 // variable for storing the killing time
113 double killTime=0.0;
114 extern void killLocal(void *_dummy,double curWallTime);
115 extern void sendKillNotify(void *_dummy,double curWallTime);
116 extern void injectSoftFailure(void *_dummy,double curWallTime);
117 #endif
118
119 #ifdef CKLOCMGR_LOOP
120 #undef CKLOCMGR_LOOP
121 #endif
122 // loop over all CkLocMgr and do "code"
123 #define  CKLOCMGR_LOOP(code)    {       \
124   int numGroups = CkpvAccess(_groupIDTable)->size();    \
125   for(int i=0;i<numGroups;i++) {        \
126     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
127     if(obj->isLocMgr())  {      \
128       CkLocMgr *mgr = (CkLocMgr*)obj;   \
129       code      \
130     }   \
131   }     \
132 }
133
134 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
135 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
136 CpvDeclare(CkCheckPTMessage**, chkpBuf);
137 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
138 //store the checkpoint of the buddy to compare
139 //do not need the whole msg, can be the checksum
140 CpvDeclare(CkCheckPTMessage*, buddyBuf);
141 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
142 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
143 //pointer of the checkpoint going to be written
144 CpvDeclare(int, curPointer);
145 CpvDeclare(int, recvdRemote);
146 CpvDeclare(int, recvdLocal);
147 CpvDeclare(int, localChkpDone);
148 CpvDeclare(int, remoteChkpDone);
149 CpvDeclare(int, remoteStarted);
150 CpvDeclare(int, localStarted);
151 CpvDeclare(int, remoteReady);
152 CpvDeclare(int, localReady);
153 CpvDeclare(int, recvdArrayChkp);
154 CpvDeclare(int, recvdProcChkp);
155 CpvDeclare(int, localChecksum);
156 CpvDeclare(int, remoteChecksum);
157
158 bool compare(char * buf1, char * buf2);
159 int getChecksum(char * buf);
160 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
161 // Converse function handles
162 static int askPhaseHandlerIdx;
163 static int recvPhaseHandlerIdx;
164 static int askProcDataHandlerIdx;
165 static int askRecoverDataHandlerIdx;
166 static int restartBcastHandlerIdx;
167 static int recoverProcDataHandlerIdx;
168 static int restartBeginHandlerIdx;
169 static int recvRemoteChkpHandlerIdx;
170 static int replicaDieHandlerIdx;
171 static int replicaChkpStartHandlerIdx;
172 static int replicaDieBcastHandlerIdx;
173 static int replicaRecoverHandlerIdx;
174 static int replicaChkpDoneHandlerIdx;
175 static int replicaBeginFailureInjectionHandlerIdx;
176 static int recoverRemoteProcDataHandlerIdx;
177 static int recoverRemoteArrayDataHandlerIdx;
178 static int notifyHandlerIdx;
179 static int replicaDyingNotifyHandlerIdx;
180 // compute the backup processor
181 // FIXME: avoid crashed processors
182 #if CMK_CONVERSE_MPI
183 static int pingHandlerIdx;
184 static int pingCheckHandlerIdx;
185 static int buddyDieHandlerIdx;
186 static double lastPingTime = -1;
187
188 extern "C" void mpi_restart_crashed(int pe, int rank);
189 extern "C" int  find_spare_mpirank(int pe,int partition);
190
191 void pingBuddy();
192 void pingCheckHandler();
193
194 #endif
195 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
196
197 inline int CkMemCheckPT::BuddyPE(int pe)
198 {
199   int budpe;
200 #if NODE_CHECKPOINT
201   // buddy is the processor with same rank on the next physical node
202   int r1 = CmiPhysicalRank(pe);
203   int budnode = CmiPhysicalNodeID(pe);
204   do {
205     budnode = (budnode+1)%CmiNumPhysicalNodes();
206     int *pelist;
207     int num;
208     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
209     budpe = pelist[r1 % num];
210   } while (isFailed(budpe));
211   if (budpe == pe) {
212     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
213     CmiAbort("Failed to find a buddy processor");
214   }
215 #else
216   budpe = pe;
217   budpe = (budpe+1)%CkNumPes();
218 #endif
219   return budpe;
220 }
221
222 // called in array element constructor
223 // choose and register with 2 buddies for checkpoiting 
224 #if CMK_MEM_CHECKPOINT
225 void ArrayElement::init_checkpt() {
226   if (_memChkptOn == 0) return;
227   if (CkInRestarting()) {
228     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
229   }
230   // only master init checkpoint
231   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
232
233   budPEs[0] = CkMyPe();
234   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
235   CmiAssert(budPEs[0] != budPEs[1]);
236   // inform checkPTMgr
237   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
238   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
239   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
240 }
241 #endif
242
243 // entry function invoked by checkpoint mgr asking for checkpoint data
244 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
245 #if CMK_MEM_CHECKPOINT
246   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
247   //char index[128];   thisIndexMax.sprint(index);
248   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
249   CkLocMgr *locMgr = thisArray->getLocMgr();
250   CmiAssert(myRec!=NULL);
251   int size;
252   {
253     PUP::sizer p;
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255     size = p.size();
256   }
257   int packSize = size/sizeof(double) +1;
258   CkArrayCheckPTMessage *msg =
259     new (packSize, 0) CkArrayCheckPTMessage;
260   msg->len = size;
261   msg->index =thisIndexMax;
262   msg->aid = thisArrayID;
263   msg->locMgr = locMgr->getGroupID();
264   msg->cp_flag = 1;
265   {
266     PUP::toMem p(msg->packData);
267     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
268   }
269
270   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
271   checkptMgr.recvData(msg, 2, budPEs);
272   delete m;
273 #endif
274 }
275
276 // checkpoint holder class - for memory checkpointing
277 class CkMemCheckPTInfo: public CkCheckPTInfo
278 {
279   CkArrayCheckPTMessage *ckBuffer;
280   public:
281   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
282     CkCheckPTInfo(a, loc, idx, pno)
283   {
284     ckBuffer = NULL;
285   }
286   ~CkMemCheckPTInfo() 
287   {
288     if (ckBuffer) delete ckBuffer; 
289   }
290   inline void updateBuffer(CkArrayCheckPTMessage *data) 
291   {
292     CmiAssert(data!=NULL);
293     if (ckBuffer) delete ckBuffer;
294     ckBuffer = data;
295   }    
296   inline CkArrayCheckPTMessage * getCopy()
297   {
298     if (ckBuffer == NULL) {
299       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
300       CmiAbort("Abort!");
301     }
302     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
303   }     
304   inline void updateBuddy(int b1, int b2) {
305     CmiAssert(ckBuffer);
306     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
307     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
308     CmiAssert(pNo != CkMyPe());
309   }
310   inline int getSize() { 
311     CmiAssert(ckBuffer);
312     return ckBuffer->len; 
313   }
314 };
315
316 // checkpoint holder class - for in-disk checkpointing
317 class CkDiskCheckPTInfo: public CkCheckPTInfo 
318 {
319   char *fname;
320   int bud1, bud2;
321   int len;                      // checkpoint size
322   public:
323   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
324   {
325 #if CMK_USE_MKSTEMP
326     fname = new char[64];
327     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
328     mkstemp(fname);
329 #else
330     fname=tmpnam(NULL);
331 #endif
332     bud1 = bud2 = -1;
333     len = 0;
334   }
335   ~CkDiskCheckPTInfo() 
336   {
337     remove(fname);
338   }
339   inline void updateBuffer(CkArrayCheckPTMessage *data) 
340   {
341     double t = CmiWallTimer();
342     // unpack it
343     envelope *env = UsrToEnv(data);
344     CkUnpackMessage(&env);
345     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
346     FILE *f = fopen(fname,"wb");
347     PUP::toDisk p(f);
348     CkPupMessage(p, (void **)&data);
349     // delay sync to the end because otherwise the messages are blocked
350     //    fsync(fileno(f));
351     fclose(f);
352     bud1 = data->bud1;
353     bud2 = data->bud2;
354     len = data->len;
355     delete data;
356     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
357   }
358   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
359   {
360     CkArrayCheckPTMessage *data;
361     FILE *f = fopen(fname,"rb");
362     PUP::fromDisk p(f);
363     CkPupMessage(p, (void **)&data);
364     fclose(f);
365     data->bud1 = bud1;                          // update the buddies
366     data->bud2 = bud2;
367     return data;
368   }
369   inline void updateBuddy(int b1, int b2) {
370     bud1 = b1; bud2 = b2;
371     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
372     CmiAssert(pNo != CkMyPe());
373   }
374   inline int getSize() { 
375     return len; 
376   }
377 };
378
379 CkMemCheckPT::CkMemCheckPT(int w)
380 {
381   int numnodes = 0;
382 #if NODE_CHECKPOINT
383   numnodes = CmiNumPhysicalNodes();
384 #else
385   numnodes = CkNumPes();
386 #endif
387 #if CK_NO_PROC_POOL
388   if (numnodes <= 2)
389 #else
390     if (numnodes  == 1)
391 #endif
392     {
393       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
394       _memChkptOn = 0;
395     }
396   inRestarting = 0;
397   inCheckpointing = 0;
398   recvCount = peCount = 0;
399   ackCount = 0;
400   expectCount = -1;
401   where = w;
402   replicaAlive = 1;
403   notifyReplica = 0;
404 #if CMK_CONVERSE_MPI
405   void pingBuddy();
406   void pingCheckHandler();
407   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
408   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
409 #endif
410   chkpTable[0] = NULL;
411   chkpTable[1] = NULL;
412   maxIter = -1;
413   recvIterCount = 0;
414   localDecided = false;
415   softFailureInjected = false;
416   if(killFlag == 2){
417     localSeed = failureSeed;
418     softLocalSeed = failureSeed*2;
419     //only inject failure to pe [0][1]
420     if(CkMyPe()==0 && CmiMyPartition()==0)
421       thisProxy[CkMyPe()].generateFailure();
422     
423     if(SMTBF!=-1 && CmiMyPartition()==1 && CkMyPe()==0){
424       thisProxy[CkMyPe()].generateSoftFailure();
425     }
426   }
427 }
428
429 void CkMemCheckPT::replicaInjectFailure(){
430   char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
431   CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
432   CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
433 }
434
435 void CkMemCheckPT::generateFailure(){
436   int rand3 = rand_r(&localSeed);
437   double sec;
438   if(strcmp(failureDist,"E")==0)
439     sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
440   else if(strcmp(failureDist,"W")==0)
441     sec = alpha*pow(-log(1.0f - ((double)rand3)/(long long int)(RAND_MAX)),1/beta);
442   killTime = CmiWallTimer()+sec;
443   printf("[%d][%d] inject hard failure after %.6lf s (MEMCKPT)\n",CmiMyPartition(),CkMyPe(),sec);
444   CcdCallFnAfter(sendKillNotify,NULL,(sec-1)*1000);
445 }
446
447 void sendKillNotify(void *_dummy,double curWallTime){
448   if(CkInCheckpointing()||CpvAccess(localStarted)==1||CkInRestarting()){
449     //in checkpointing or restart, delaying sending the notify
450     CkPrintf("[%d][%d]in checkpointing, recheck after 0.5s at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
451     CcdCallFnAfter(sendKillNotify,NULL,500);
452   }else{
453     CkMemCheckPT::aboutToDie =  1;
454     char * msg1 = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
455     CmiSetHandler(msg1, replicaDyingNotifyHandlerIdx);
456     CmiRemoteSyncSendAndFree(1,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg1);
457     
458     //now it can die
459     double sec = 0.01;
460     if(CmiWallTimer()<killTime){
461       sec=killTime-CmiWallTimer();  
462     }
463     CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
464     checkptMgr[1].killAfter(sec);
465     
466   }
467 }
468
469 void CkMemCheckPT::killAfter(double sec){
470   killTime = CmiWallTimer()+sec;
471   printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
472   CcdCallFnAfter(killLocal,NULL,sec*1000);
473 }
474
475 void replicaDyingNotify(char * msg){
476   CkMemCheckPT::aboutToDie =  1;
477   CmiFree(msg);
478 }
479
480 void CkMemCheckPT::generateSoftFailure(){
481   int rand = rand_r(&softLocalSeed);
482   double sec;
483   if(strcmp(failureDist,"E")==0){
484     sec = -log(1.0f - ((double)rand)/(long long int)(RAND_MAX))*SMTBF;
485   }
486   else{
487     sec = s_alpha*pow(-log(1.0f - ((double)rand)/(long long int)(RAND_MAX)),1/beta);
488   }
489   printf("[%d][%d] inject soft failure after %.6lf s (MEMCKPT)\n",CmiMyPartition(),CkMyPe(),sec);
490   CcdCallFnAfter(injectSoftFailure,NULL,sec*1000);
491 }
492
493 CkMemCheckPT::~CkMemCheckPT()
494 {
495   int len = ckTable.length();
496   for (int i=0; i<len; i++) {
497     delete ckTable[i];
498   }
499 }
500
501 void CkMemCheckPT::pup(PUP::er& p) 
502
503   CBase_CkMemCheckPT::pup(p); 
504   p|cpStarter;
505   p|thisFailedPe;
506   p|failedPes;
507   p|ckCheckPTGroupID;           // recover global variable
508   p|cpCallback;                 // store callback
509   p|where;                      // where to checkpoint
510   p|peCount;
511   p|localSeed;
512   p|softLocalSeed;
513   if (p.isUnpacking()) {
514     recvCount = 0;
515 #if CMK_CONVERSE_MPI
516     void pingBuddy();
517     void pingCheckHandler();
518     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
519     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
520 #endif
521     maxIter = -1;
522     recvIterCount = 0;
523     localDecided = false;
524   }
525 }
526
527 void CkMemCheckPT::getIter(){
528   localDecided = true;
529   localMaxIter = maxIter+1;
530   if(CkMyPe()==0){
531     CkPrintf("local max iter is %d\n",localMaxIter);
532   }
533   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
534   int elemCount = CkCountChkpSyncElements();
535   if(CkMyPe()==0)
536     startTime = CmiWallTimer();
537   if(elemCount == 0){
538     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
539   }
540 }
541
542 //when one replica failes, decide the next chkp iter
543
544 void CkMemCheckPT::recvIter(int iter){
545   if(maxIter<iter){
546     maxIter=iter;
547   }
548 }
549
550 void CkMemCheckPT::recvMaxIter(int iter){
551   localDecided = false;
552   if(CkMyPe()==0)
553     CkPrintf("checkpoint iteration is %d\n",iter);
554   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
555 }
556
557 void CkMemCheckPT::reachChkpIter(){
558   recvIterCount++;
559   elemCount = CkCountChkpSyncElements();
560   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
561   if(recvIterCount == elemCount){
562     recvIterCount = 0;
563     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
564   }
565 }
566
567 void CkMemCheckPT::startChkp(){
568   if(CkInCheckpointing()){
569     return;
570   }
571   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
572   CkStartMemCheckpoint(cpCallback);
573 }
574
575 // called by checkpoint mgr to restore an array element
576 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
577 {
578 #if CMK_MEM_CHECKPOINT
579   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
580   // m->index.print();
581   PUP::fromMem p(m->packData);
582   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
583   CmiAssert(mgr);
584 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
585   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
586 #else
587   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
588 #endif
589
590   // find a list of array elements bound together
591   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
592   CmiAssert(elt);
593   CkLocRec_local *rec = elt->myRec;
594   CkVec<CkMigratable *> list;
595   mgr->migratableList(rec, list);
596   CmiAssert(list.length() > 0);
597   for (int l=0; l<list.length(); l++) {
598     elt = (ArrayElement *)list[l];
599     elt->budPEs[0] = m->bud1;
600     elt->budPEs[1] = m->bud2;
601     //    reset, may not needed now
602     // for now.
603     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
604       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
605       if (c) c->redNo = 0;
606     }
607   }
608 #endif
609 }
610
611 // return 1 if pe is a crashed processor
612 int CkMemCheckPT::isFailed(int pe)
613 {
614   for (int i=0; i<failedPes.length(); i++)
615     if (failedPes[i] == pe) return 1;
616   return 0;
617 }
618
619 // add pe into history list of all failed processors
620 void CkMemCheckPT::failed(int pe)
621 {
622   if (isFailed(pe)) return;
623   failedPes.push_back(pe);
624 }
625
626 int CkMemCheckPT::totalFailed()
627 {
628   return failedPes.length();
629 }
630
631 // create an checkpoint entry for array element of aid with index.
632 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
633 {
634   // error check, no duplicate
635   int idx, len = ckTable.size();
636   for (idx=0; idx<len; idx++) {
637     CkCheckPTInfo *entry = ckTable[idx];
638     if (index == entry->index) {
639       if (loc == entry->locMgr) {
640         // bindTo array elements
641         return;
642       }
643       // for array inheritance, the following check may fail
644       // because ArrayElement constructor of all superclasses are called
645       if (aid == entry->aid) {
646         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
647         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
648       }
649     }
650   }
651   CkCheckPTInfo *newEntry;
652   if (where == CkCheckPoint_inMEM)
653     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
654   else
655     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
656   ckTable.push_back(newEntry);
657   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
658 }
659
660 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
661 {
662 #if !CMK_CHKP_ALL       
663   int buddy = msg->bud1;
664   if (buddy == CkMyPe()) buddy = msg->bud2;
665   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
666   recvData(msg);
667   // ack
668   thisProxy[buddy].gotData();
669 #else
670   chkpTable[0] = NULL;
671   chkpTable[1] = NULL;
672   recvArrayCheckpoint(msg);
673   thisProxy[msg->bud2].gotData();
674 #endif
675 }
676
677 void CkMemCheckPT::chkpLocalStart()
678 {
679   CpvAccess(localStarted) = 1;
680 }
681
682 // loop through my checkpoint table and ask checkpointed array elements
683 // to send me checkpoint data.
684 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
685 void CkMemCheckPT::doItNow(int starter)
686 {
687   checkpointed = 1;
688   //cpCallback = cb;
689   cpStarter = starter;
690   inCheckpointing = 1;
691   if (CkMyPe() == cpStarter) {
692     startTime = CmiWallTimer();
693     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
694   }
695 #if CMK_CONVERSE_MPI
696   if(CmiNumPartition()==1)
697 #endif    
698   {
699 #if !CMK_CHKP_ALL
700     int len = ckTable.length();
701     for (int i=0; i<len; i++) {
702       CkCheckPTInfo *entry = ckTable[i];
703       // always let the bigger number processor send request
704       //if (CkMyPe() < entry->pNo) continue;
705       // always let the smaller number processor send request, may on same proc
706       if (!isMaster(entry->pNo)) continue;
707       // call inmem_checkpoint to the array element, ask it to send
708       // back checkpoint data via recvData().
709       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
710       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
711     }
712     // if my table is empty, then I am done
713     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
714 #else
715     startArrayCheckpoint();
716 #endif
717     // pack and send proc level data
718     sendProcData();
719   }
720 #if CMK_CONVERSE_MPI
721   else
722   {
723     startCheckpoint();
724   }
725 #endif
726 }
727
728 class MemElementPacker : public CkLocIterator{
729   private:
730     //    CkLocMgr *locMgr;
731     PUP::er &p;
732     std::map<CkHashCode,CkLocation> arrayMap;
733   public:
734     MemElementPacker(PUP::er &p_):p(p_){};
735     void addLocation(CkLocation &loc){
736       CkArrayIndexMax idx = loc.getIndex();
737       arrayMap[idx.hash()] = loc;
738     }
739     void writeCheckpoint(){
740       std::map<CkHashCode, CkLocation>::iterator it;
741       for(it = arrayMap.begin();it!=arrayMap.end();it++){
742         CkLocation loc = it->second;
743         CkLocMgr *locMgr = loc.getManager();
744         CkArrayIndexMax idx = loc.getIndex();
745         CkGroupID gID = locMgr->ckGetGroupID();
746         p|gID;
747         p|idx;
748         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
749       }
750     }
751 };
752
753 void pupAllElements(PUP::er &p){
754 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
755   int numElements;
756   if(!p.isUnpacking()){
757     numElements = CkCountArrayElements();
758   }
759   p | numElements;
760   if(!p.isUnpacking()){
761     MemElementPacker packer(p);
762     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
763     packer.writeCheckpoint();
764   }
765 #endif
766 }
767
768 void CkMemCheckPT::startArrayCheckpoint(){
769 #if CMK_CHKP_ALL
770   int size;
771   {
772     PUP::sizer psizer;
773     pupAllElements(psizer);
774     size = psizer.size();
775   }
776   int packSize = size/sizeof(double)+1;
777   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
778   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
779   msg->len = size;
780   msg->cp_flag = 1;
781   int budPEs[2];
782   msg->bud1=CkMyPe();
783   msg->bud2=ChkptOnPe(CkMyPe());
784   {
785     PUP::toMem p(msg->packData);
786     pupAllElements(p);
787   }
788   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
789   if(chkpTable[0]) delete chkpTable[0];
790   chkpTable[0] = msg;
791   //send the checkpoint to my 
792   recvCount++;
793 #endif
794 }
795
796 void CkMemCheckPT::startCheckpoint(){
797 #if CMK_CONVERSE_MPI
798   int size;
799   {
800     PUP::sizer p;
801     _handleProcData(p,CmiFalse);
802     size = p.size();
803   }
804   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
805   procMsg->len = size;
806   procMsg->cp_flag = 1;
807   {
808     PUP::toMem p(procMsg->packData);
809     _handleProcData(p,CmiFalse);
810   }
811   int pointer = CpvAccess(curPointer);
812   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
813   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
814
815   {
816     PUP::sizer psizer;
817     pupAllElements(psizer);
818     size = psizer.size();
819   }
820   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
821   msg->len = size;
822   msg->cp_flag = 1;
823   int checksum;
824   {
825     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
826       PUP::checker p(msg->packData);
827       pupAllElements(p);
828       checksum = p.getChecksum();
829     }else{  
830 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
831       PUP::toMem p(msg->packData);
832       pupAllElements(p);
833     }
834   }
835   pointer = CpvAccess(curPointer);
836   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
837   CpvAccess(chkpBuf)[pointer] = msg;
838   
839   if(CkMyPe()==0)
840     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
841   
842   if(CkReplicaAlive()==1){
843     CpvAccess(recvdLocal) = 1;
844     if(CpvAccess(use_checksum)){
845       CpvAccess(localChecksum) = checksum;
846       //only one reaplica will send
847       if(CmiMyPartition()==0){
848         char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
849         *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
850         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
851         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
852       }
853     }else{
854       if(CmiMyPartition()==0){
855         envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
856         CkPackMessage(&env);
857         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
858         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
859       }
860     }
861     if(CmiMyPartition()==0){
862       notifyReplica = 1;
863       thisProxy[CkMyPe()].doneComparison(true);
864     }
865   }
866   if(CpvAccess(recvdRemote)==1){
867     //only partition 1 will do it
868     //compare the checkpoint 
869     int size = CpvAccess(chkpBuf)[pointer]->len;
870     if(CpvAccess(use_checksum)){
871       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
872         thisProxy[CkMyPe()].doneComparison(true);
873       }
874       else{
875         thisProxy[CkMyPe()].doneComparison(true);
876         //thisProxy[CkMyPe()].doneComparison(false);
877       }
878     }else{
879       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
880         thisProxy[CkMyPe()].doneComparison(true);
881       }
882       else{
883         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
884         thisProxy[CkMyPe()].doneComparison(true);
885         //thisProxy[CkMyPe()].doneComparison(false);
886       }
887       
888       if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
889     }
890     if(CkMyPe()==0)
891       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
892   }
893   else{
894     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
895       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
896       if(CpvAccess(remoteReady)==1){
897         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
898         {       
899           int pointer = CpvAccess(curPointer);
900           //send the proc data
901           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
902           procMsg->pointer = pointer;
903           envelope * env = (envelope *)(UsrToEnv(procMsg));
904           CkPackMessage(&env);
905           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
906           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
907           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
908             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
909         }
910         //send the array checkpoint data
911         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
912         msg->pointer = CpvAccess(curPointer);
913         envelope * env = (envelope *)(UsrToEnv(msg));
914         CkPackMessage(&env);
915         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
916         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
917         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
918           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
919       }else{
920         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
921           int pointer = CpvAccess(curPointer);
922           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
923           (CpvAccess(recoverProcBuf))->pointer = pointer;
924         }
925         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
926         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
927         CpvAccess(localReady) = 1;
928       }
929       //can continue work, no need to wait for my replica
930       int _ret = 1;
931       notifyReplica = 1;
932       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
933       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
934     }
935   }
936 #endif
937 }
938
939 void CkMemCheckPT::doneComparison(bool ret){
940   int _ret = 1;
941   if(!ret){
942     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
943     _ret = 0;
944   }else{
945     _ret = 1;
946   }
947   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
948   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
949 }
950
951 void CkMemCheckPT::doneRComparison(int ret){
952   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
953   if(ret==CkNumPes()&&!softFailureInjected){
954     CpvAccess(localChkpDone) = 1;
955     if(CpvAccess(remoteChkpDone) ==1){
956       thisProxy.doneBothComparison();
957     }else{
958       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
959     }
960   }
961   else{
962     ret = 0;
963     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
964     startTime = CmiWallTimer();
965     thisProxy.RollBack();
966   }
967   if(notifyReplica == 0){
968     //notify the replica am done
969     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
970     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
971     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
972     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
973     notifyReplica = 1;
974   }
975 }
976
977 void CkMemCheckPT::doneBothComparison(){
978   CpvAccess(recvdRemote) = 0;
979   CpvAccess(remoteReady) = 0;
980   CpvAccess(localReady) = 0;
981   CpvAccess(recvdLocal) = 0;
982   CpvAccess(localChkpDone) = 0;
983   CpvAccess(remoteChkpDone) = 0;
984   CpvAccess(remoteStarted) = 0;
985   CpvAccess(localStarted) = 0;
986   CpvAccess(_remoteCrashedNode) = -1;
987   CkMemCheckPT::replicaAlive = 1;
988   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
989   CpvAccess(curPointer)^=1;
990   inCheckpointing = 0;
991   notifyReplica = 0;
992   if(CkMyPe() == 0){
993     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
994   }
995   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
996 }
997
998 void CkMemCheckPT::RollBack(){
999   //restore group data
1000   checkpointed = 0;
1001   inRestarting = 1;
1002   int pointer = CpvAccess(curPointer)^1;//use the previous one
1003   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
1004   PUP::fromMem p(chkpMsg->packData);    
1005
1006   //destroy array elements
1007   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1008   int numGroups = CkpvAccess(_groupIDTable)->size();
1009   for(int i=0;i<numGroups;i++) {
1010     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1011     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1012     obj->flushStates();
1013     obj->ckJustMigrated();
1014   }
1015   //restore array elements
1016
1017   int numElements;
1018   p|numElements;
1019
1020   if(p.isUnpacking()){
1021     for(int i=0;i<numElements;i++){
1022       CkGroupID gID;
1023       CkArrayIndex idx;
1024       p|gID;
1025       p|idx;
1026       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1027       CmiAssert(mgr);
1028       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1029     }
1030   }
1031   
1032   softFailureInjected = false;
1033   CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
1034   contribute(cb);
1035 }
1036
1037   void CkMemCheckPT::notifyReplicaDie(int pe){
1038     replicaAlive = 0;
1039     CpvAccess(_remoteCrashedNode) = pe;
1040   }
1041
1042   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
1043   {
1044 #if CMK_CHKP_ALL
1045     int idx = 1;
1046     if(msg->bud1 == CkMyPe()){
1047       idx = 0;
1048     }
1049     int isChkpting = msg->cp_flag;
1050     if(isChkpting == 1){
1051       if(chkpTable[idx]) delete chkpTable[idx];
1052     }
1053     chkpTable[idx] = msg;
1054     if(isChkpting){
1055       recvCount++;
1056       if(recvCount == 2){
1057         if (where == CkCheckPoint_inMEM) {
1058           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1059         }
1060         else if (where == CkCheckPoint_inDISK) {
1061           // another barrier for finalize the writing using fsync
1062           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1063           contribute(0,NULL,CkReduction::sum_int,localcb);
1064         }
1065         else
1066           CmiAbort("Unknown checkpoint scheme");
1067         recvCount = 0;
1068       }
1069     }
1070 #endif
1071   }
1072
1073   // don't handle array elements
1074   static inline void _handleProcData(PUP::er &p, CmiBool create)
1075   {
1076     // save readonlys, and callback BTW
1077     CkPupROData(p);
1078
1079     // save mainchares 
1080     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
1081
1082 #ifndef CMK_CHARE_USE_PTR
1083     // save non-migratable chare
1084     CkPupChareData(p);
1085 #endif
1086
1087     // save groups into Groups.dat
1088     CkPupGroupData(p,create);
1089
1090     // save nodegroups into NodeGroups.dat
1091     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
1092   }
1093
1094   void CkMemCheckPT::sendProcData()
1095   {
1096     // find out size of buffer
1097     int size;
1098     {
1099       PUP::sizer p;
1100       _handleProcData(p,CmiTrue);
1101       size = p.size();
1102     }
1103     int packSize = size;
1104     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1105     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1106     {
1107       PUP::toMem p(msg->packData);
1108       _handleProcData(p,CmiTrue);
1109     }
1110     msg->pe = CkMyPe();
1111     msg->len = size;
1112     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1113     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1114   }
1115
1116   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1117   {
1118     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1119     CpvAccess(procChkptBuf) = msg;
1120     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1121     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1122   }
1123
1124   // ArrayElement call this function to give us the checkpointed data
1125   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1126   {
1127     int len = ckTable.length();
1128     int idx;
1129     for (idx=0; idx<len; idx++) {
1130       CkCheckPTInfo *entry = ckTable[idx];
1131       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1132     }
1133     CkAssert(idx < len);
1134     int isChkpting = msg->cp_flag;
1135     ckTable[idx]->updateBuffer(msg);
1136     if (isChkpting) {
1137       // all my array elements have returned their inmem data
1138       // inform starter processor that I am done.
1139       recvCount ++;
1140       if (recvCount == ckTable.length()) {
1141         if (where == CkCheckPoint_inMEM) {
1142           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1143         }
1144         else if (where == CkCheckPoint_inDISK) {
1145           // another barrier for finalize the writing using fsync
1146           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1147           contribute(0,NULL,CkReduction::sum_int,localcb);
1148         }
1149         else
1150           CmiAbort("Unknown checkpoint scheme");
1151         recvCount = 0;
1152       } 
1153     }
1154   }
1155
1156   // only used in disk checkpointing
1157   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1158   {
1159     delete m;
1160 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1161     system("sync");
1162 #endif
1163     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1164   }
1165
1166   // only is called on cpStarter when checkpoint is done
1167   void CkMemCheckPT::cpFinish()
1168   {
1169     CmiAssert(CkMyPe() == cpStarter);
1170     peCount++;
1171     // now that all processors have finished, activate callback
1172     if (peCount == 2) 
1173     {
1174       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1175       peCount = 0;
1176       thisProxy.report();
1177     }
1178   }
1179
1180   // for debugging, report checkpoint info
1181   void CkMemCheckPT::report()
1182   {
1183     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1184     inCheckpointing = 0;
1185 #if !CMK_CHKP_ALL
1186     int objsize = 0;
1187     int len = ckTable.length();
1188     for (int i=0; i<len; i++) {
1189       CkCheckPTInfo *entry = ckTable[i];
1190       CmiAssert(entry);
1191       objsize += entry->getSize();
1192     }
1193     CmiAssert(CpvAccess(procChkptBuf));
1194     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1195 #else
1196     if(CkMyPe()==0)
1197       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1198 #endif
1199   }
1200
1201   /*****************************************************************************
1202     RESTART Procedure
1203    *****************************************************************************/
1204
1205   // master processor of two buddies
1206   inline int CkMemCheckPT::isMaster(int buddype)
1207   {
1208 #if 0
1209     int mype = CkMyPe();
1210     //CkPrintf("ismaster: %d %d\n", pe, mype);
1211     if (CkNumPes() - totalFailed() == 2) {
1212       return mype > buddype;
1213     }
1214     for (int i=1; i<CkNumPes(); i++) {
1215       int me = (buddype+i)%CkNumPes();
1216       if (isFailed(me)) continue;
1217       if (me == mype) return 1;
1218       else return 0;
1219     }
1220     return 0;
1221 #else
1222     // smaller one
1223     int mype = CkMyPe();
1224     //CkPrintf("ismaster: %d %d\n", pe, mype);
1225     if (CkNumPes() - totalFailed() == 2) {
1226       return mype < buddype;
1227     }
1228 #if NODE_CHECKPOINT
1229     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1230     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1231 #else
1232       for (int i=1; i<CkNumPes(); i++) {
1233 #endif
1234         int me = (mype+i)%CkNumPes();
1235         if (isFailed(me)) continue;
1236         if (me == buddype) return 1;
1237         else return 0;
1238       }
1239       return 0;
1240 #endif
1241     }
1242
1243
1244
1245 #if 0
1246     // helper class to pup all elements that belong to same ckLocMgr
1247     class ElementDestoryer : public CkLocIterator {
1248       private:
1249         CkLocMgr *locMgr;
1250       public:
1251         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1252         void addLocation(CkLocation &loc) {
1253           CkArrayIndex idx=loc.getIndex();
1254           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1255           loc.destroy();
1256         }
1257     };
1258 #endif
1259
1260     // restore the bitmap vector for LB
1261     void CkMemCheckPT::resetLB(int diepe)
1262     {
1263 #if CMK_LBDB_ON
1264       int i;
1265       char *bitmap = new char[CkNumPes()];
1266       // set processor available bitmap
1267       get_avail_vector(bitmap);
1268       for (i=0; i<failedPes.length(); i++)
1269         bitmap[failedPes[i]] = 0; 
1270       bitmap[diepe] = 0;
1271
1272 #if CK_NO_PROC_POOL
1273       set_avail_vector(bitmap);
1274 #endif
1275
1276       // if I am the crashed pe, rebuild my failedPEs array
1277       if (CkMyNode() == diepe)
1278         for (i=0; i<CkNumPes(); i++) 
1279           if (bitmap[i]==0) failed(i);
1280
1281       delete [] bitmap;
1282 #endif
1283     }
1284
1285     static double restartT;
1286
1287     // in case when failedPe dies, everybody go through its checkpoint table:
1288     // destory all array elements
1289     // recover lost buddies
1290     // reconstruct all array elements from check point data
1291     // called on all processors
1292     void CkMemCheckPT::restart(int diePe)
1293     {
1294 #if CMK_MEM_CHECKPOINT
1295       thisFailedPe = diePe;
1296       double curTime = CmiWallTimer();
1297       if (CkMyPe() == thisFailedPe){
1298         restartT = CmiWallTimer();
1299         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1300       }
1301       stage = (char*)"resetLB";
1302       startTime = curTime;
1303       if (CkMyPe() == thisFailedPe)
1304         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1305
1306 //#if CK_NO_PROC_POOL
1307 //      failed(diePe);  // add into the list of failed pes
1308 //#endif
1309
1310 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1311
1312       inRestarting = 1;
1313
1314       // disable load balancer's barrier
1315       //if (CkMyPe() != diePe) resetLB(diePe);
1316
1317       CKLOCMGR_LOOP(mgr->startInserting(););
1318
1319
1320       if(CmiNumPartition()==1){
1321         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1322       }else{
1323         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1324         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1325       }
1326 #endif
1327     }
1328
1329     // loally remove all array elements
1330     void CkMemCheckPT::removeArrayElements()
1331     {
1332 #if CMK_MEM_CHECKPOINT
1333       int len = ckTable.length();
1334       double curTime = CmiWallTimer();
1335       if (CkMyPe() == thisFailedPe) 
1336         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1337       stage = (char*)"removeArrayElements";
1338       startTime = curTime;
1339
1340       //  if (cpCallback.isInvalid()) 
1341       //          CkPrintf("invalid pe %d\n",CkMyPe());
1342       //          CkAbort("Didn't set restart callback\n");;
1343       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1344
1345       // get rid of all buffering and remote recs
1346       // including destorying all array elements
1347 #if CK_NO_PROC_POOL  
1348       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1349 #else
1350       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1351 #endif
1352       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1353 #endif
1354     }
1355
1356     // flush state in reduction manager
1357     void CkMemCheckPT::resetReductionMgr()
1358     {
1359       if (CkMyPe() == thisFailedPe) 
1360         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1361       stage = (char *)"resetReductionMgr";
1362       int numGroups = CkpvAccess(_groupIDTable)->size();
1363       for(int i=0;i<numGroups;i++) {
1364         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1365         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1366         obj->flushStates();
1367         obj->ckJustMigrated();
1368       }
1369
1370       //reset CmiReduce
1371       CmiResetReductions();
1372
1373       // reset again
1374       //CpvAccess(_qd)->flushStates();
1375       if(CmiNumPartition()==1){
1376         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1377       }
1378       else{
1379         if (CkMyPe() == thisFailedPe) 
1380           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1381         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1382       }
1383     }
1384
1385     // recover the lost buddies
1386     void CkMemCheckPT::recoverBuddies()
1387     {
1388       int idx;
1389       int len = ckTable.length();
1390       // ready to flush reduction manager
1391       // cannot be CkMemCheckPT::restart because destory will modify states
1392       double curTime = CmiWallTimer();
1393       if (CkMyPe() == thisFailedPe)
1394         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1395       stage = (char *)"recoverBuddies";
1396       if (CkMyPe() == thisFailedPe)
1397         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1398       startTime = curTime;
1399
1400       // recover buddies
1401       expectCount = 0;
1402 #if !CMK_CHKP_ALL
1403       for (idx=0; idx<len; idx++) {
1404         CkCheckPTInfo *entry = ckTable[idx];
1405         if (entry->pNo == thisFailedPe) {
1406 #if CK_NO_PROC_POOL
1407           // find a new buddy
1408           int budPe = BuddyPE(CkMyPe());
1409 #else
1410           int budPe = thisFailedPe;
1411 #endif
1412           CkArrayCheckPTMessage *msg = entry->getCopy();
1413           msg->bud1 = budPe;
1414           msg->bud2 = CkMyPe();
1415           msg->cp_flag = 0;            // not checkpointing
1416           thisProxy[budPe].recoverEntry(msg);
1417           expectCount ++;
1418         }
1419       }
1420 #else
1421       //send to failed pe
1422       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1423 #if CK_NO_PROC_POOL
1424         // find a new buddy
1425         int budPe = BuddyPE(CkMyPe());
1426 #else
1427         int budPe = thisFailedPe;
1428 #endif
1429         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1430         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1431         msg->cp_flag = 0;            // not checkpointing
1432         msg->bud1 = budPe;
1433         msg->bud2 = CkMyPe();
1434         thisProxy[budPe].recoverEntry(msg);
1435         expectCount ++;
1436       }
1437 #endif
1438
1439       if (expectCount == 0) {
1440         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1441       }
1442       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1443     }
1444
1445     void CkMemCheckPT::gotData()
1446     {
1447       ackCount ++;
1448       if (ackCount == expectCount) {
1449         ackCount = 0;
1450         expectCount = -1;
1451         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1452         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1453       }
1454     }
1455
1456     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1457     {
1458
1459       for (int i=0; i<n; i++) {
1460         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1461         mgr->updateLocation(idx[i], nowOnPe);
1462       }
1463       thisProxy[nowOnPe].gotReply();
1464     }
1465
1466     // restore array elements
1467     void CkMemCheckPT::recoverArrayElements()
1468     {
1469       double curTime = CmiWallTimer();
1470       int len = ckTable.length();
1471       if (CkMyPe() == thisFailedPe)
1472         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1473       stage = (char *)"recoverArrayElements";
1474       startTime = curTime;
1475       int flag = 0;
1476       // recover all array elements
1477       int count = 0;
1478
1479 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1480       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1481       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1482 #endif
1483
1484 #if !CMK_CHKP_ALL
1485       for (int idx=0; idx<len; idx++)
1486       {
1487         CkCheckPTInfo *entry = ckTable[idx];
1488 #if CK_NO_PROC_POOL
1489         // the bigger one will do 
1490         //    if (CkMyPe() < entry->pNo) continue;
1491         if (!isMaster(entry->pNo)) continue;
1492 #else
1493         // smaller one do it, which has the original object
1494         if (CkMyPe() == entry->pNo+1 || 
1495             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1496 #endif
1497         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1498
1499         entry->updateBuddy(CkMyPe(), entry->pNo);
1500         CkArrayCheckPTMessage *msg = entry->getCopy();
1501         // gzheng
1502         //thisProxy[CkMyPe()].inmem_restore(msg);
1503         inmem_restore(msg);
1504 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1505         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1506         int homePe = mgr->homePe(msg->index);
1507         if (homePe != CkMyPe()) {
1508           gmap[homePe].push_back(msg->locMgr);
1509           imap[homePe].push_back(msg->index);
1510         }
1511 #endif
1512         CkFreeMsg(msg);
1513         count ++;
1514       }
1515 #else
1516       char * packData;
1517       if(CmiNumPartition()==1){
1518         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1519         packData = (char *)msg->packData;
1520 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1521         recoverAll(packData,gmap,imap);
1522 #else
1523         recoverAll(packData);
1524 #endif
1525         CkFreeMsg(msg);
1526       }
1527       else{
1528         int pointer = CpvAccess(curPointer);
1529         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1530         packData = msg->packData;
1531 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1532         recoverAll(packData,gmap,imap);
1533 #else
1534         recoverAll(packData);
1535 #endif
1536         CkFreeMsg(msg);
1537       }
1538 #endif
1539 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1540       for (int i=0; i<CkNumPes(); i++) {
1541         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1542           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1543           flag++;       
1544         }
1545       }
1546       delete [] imap;
1547       delete [] gmap;
1548 #endif
1549       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1550       //if (CkMyPe() == thisFailedPe)
1551
1552       CKLOCMGR_LOOP(mgr->doneInserting(););
1553
1554       // _crashedNode = -1;
1555       CpvAccess(_crashedNode) = -1;
1556 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1557       if (CkMyPe() == 0)
1558         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1559 #else
1560       if(flag == 0)
1561       {
1562         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1563       }
1564 #endif
1565     }
1566
1567     void CkMemCheckPT::gotReply(){
1568       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1569     }
1570
1571     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1572 #if CMK_CHKP_ALL
1573       PUP::fromMem p(packData);
1574       int numElements;
1575       p|numElements;
1576       if(p.isUnpacking()){
1577         for(int i=0;i<numElements;i++){
1578           CkGroupID gID;
1579           CkArrayIndex idx;
1580           p|gID;
1581           p|idx;
1582           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1583           int homePe = mgr->homePe(idx);
1584 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1585           mgr->resume(idx,p,CmiTrue,CmiTrue);
1586 #else
1587           if(CmiNumPartition()==1)
1588             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1589           else{
1590             if(CkMyPe()==thisFailedPe){
1591               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1592             }
1593             else{
1594               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1595             }
1596           }
1597 #endif
1598 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1599           homePe = mgr->homePe(idx);
1600           if (homePe != CkMyPe()) {
1601             gmap[homePe].push_back(gID);
1602             imap[homePe].push_back(idx);
1603           }
1604 #endif
1605         }
1606       }
1607 #endif
1608     }
1609
1610
1611     // on every processor
1612     // turn load balancer back on
1613     void CkMemCheckPT::finishUp()
1614     {
1615       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1616       //CKLOCMGR_LOOP(mgr->doneInserting(););
1617
1618       if (CkMyPe() == thisFailedPe)
1619       {
1620         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1621         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1622         fflush(stdout);
1623       }
1624
1625
1626       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1627       inRestarting = 0;
1628       maxIter = -1;
1629 #if CMK_CONVERSE_MPI    
1630       if(CmiNumPartition()!=1){
1631         CpvAccess(recvdProcChkp) = 0;
1632         CpvAccess(recvdArrayChkp) = 0;
1633         CpvAccess(curPointer)^=1;
1634         //notify my replica, restart is done
1635         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1636           CpvAccess(remoteStarted) =0;
1637           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1638           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1639           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1640         }
1641       }
1642       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1643         lastPingTime = CmiWallTimer();
1644         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1645       }
1646       //inject next failure
1647       if(killFlag==2){
1648         if(CkMyPe()==0&&CmiMyPartition()==0)
1649           thisProxy[CkMyPe()].generateFailure();
1650       }
1651 #endif
1652
1653 #if CK_NO_PROC_POOL
1654 #if NODE_CHECKPOINT
1655       int numnodes = CmiNumPhysicalNodes();
1656 #else
1657       int numnodes = CkNumPes();
1658 #endif
1659       if (numnodes-totalFailed() <=2) {
1660         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1661         _memChkptOn = 0;
1662       }
1663 #endif
1664     }
1665
1666     void CkMemCheckPT::recoverFromSoftFailure()
1667     {
1668       inRestarting = 0;
1669       maxIter = -1;
1670       CpvAccess(recvdRemote) = 0;
1671       CpvAccess(recvdLocal) = 0;
1672       CpvAccess(localChkpDone) = 0;
1673       CpvAccess(remoteChkpDone) = 0;
1674       CpvAccess(remoteReady) = 0;
1675       CpvAccess(localReady) = 0;
1676       inCheckpointing = 0;
1677       notifyReplica = 0;
1678       CpvAccess(remoteStarted) = 0;
1679       CpvAccess(localStarted) = 0;
1680       CpvAccess(_remoteCrashedNode) = -1;
1681       CkMemCheckPT::replicaAlive = 1;
1682       inCheckpointing = 0;
1683       notifyReplica = 0;
1684       if(CkMyPe() == 0){
1685         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1686       }
1687       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1688       if(killFlag==2 && CmiMyPartition()==1 && CkMyPe()==0){
1689         thisProxy[CkMyPe()].generateSoftFailure();
1690       }
1691     }
1692     // called only on 0
1693     void CkMemCheckPT::quiescence(CkCallback &cb)
1694     {
1695       static int pe_count = 0;
1696       pe_count ++;
1697       CmiAssert(CkMyPe() == 0);
1698       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1699       if (pe_count == CkNumPes()) {
1700         pe_count = 0;
1701         cb.send();
1702       }
1703     }
1704
1705     // User callable function - to start a checkpoint
1706     // callback cb is used to pass control back
1707     void CkStartMemCheckpoint(CkCallback &cb)
1708     {
1709 #if CMK_MEM_CHECKPOINT
1710       if(CkMemCheckPT::aboutToDie&&CmiMyPartition()==0)
1711         return;
1712       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1713       /*if (_memChkptOn == 0) {
1714         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1715         cb.send();
1716         return;
1717         }*/
1718       if (CkInRestarting()) {
1719         // trying to checkpointing during restart
1720         cb.send();
1721         return;
1722       }
1723       if(CkInCheckpointing())
1724         return;
1725       // store user callback and user data
1726       CkMemCheckPT::cpCallback = cb;
1727
1728
1729       //send to my replica that checkpoint begins 
1730       if(CkReplicaAlive()==1){
1731         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1732         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1733         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1734       }
1735       CpvAccess(localStarted) = 1;
1736       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1737       checkptMgr.chkpLocalStart();
1738       // broadcast to start check pointing
1739       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1740         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1741         checkptMgr.doItNow(CkMyPe());
1742       }
1743 #else
1744       // when mem checkpoint is disabled, invike cb immediately
1745       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1746       cb.send();
1747 #endif
1748     }
1749
1750     void CkRestartCheckPoint(int diePe)
1751     {
1752       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1753       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1754       // broadcast
1755       checkptMgr.restart(diePe);
1756     }
1757
1758     static int _diePE = -1;
1759
1760     // callback function used locally by ccs handler
1761     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1762     {
1763       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1764       CkRestartCheckPoint(_diePE);
1765     }
1766
1767
1768     // called on crashed PE
1769     static void restartBeginHandler(char *msg)
1770     {
1771       CmiFree(msg);
1772 #if CMK_MEM_CHECKPOINT
1773 #if CMK_USE_BARRIER
1774       if(CkMyPe()!=_diePE){
1775         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1776         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1777         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1778         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1779       }else{
1780         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1781         CkRestartCheckPointCallback(NULL, NULL);
1782       }
1783 #else
1784       static int count = 0;
1785       CmiAssert(CkMyPe() == _diePE);
1786       count ++;
1787       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1788         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1789         CkRestartCheckPointCallback(NULL, NULL);
1790         count = 0;
1791       }
1792 #endif
1793 #endif
1794     }
1795
1796     extern void _discard_charm_message();
1797     extern void _resume_charm_message();
1798
1799     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1800       return data;
1801     }
1802
1803     static void restartBcastHandler(char *msg)
1804     {
1805 #if CMK_MEM_CHECKPOINT
1806       // advance phase counter
1807       CkMemCheckPT::inRestarting = 1;
1808       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1809       // gzheng
1810       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1811
1812       if (CkMyPe()==_diePE)
1813         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1814
1815       // reset QD counters
1816       /*  gzheng
1817           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1818        */
1819
1820       /*  gzheng
1821           if (CkMyPe()==_diePE)
1822           CkRestartCheckPointCallback(NULL, NULL);
1823        */
1824       CmiFree(msg);
1825
1826       _resume_charm_message();
1827
1828       // reduction
1829       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1830       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1831 #if CMK_USE_BARRIER
1832       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1833 #else
1834       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1835 #endif 
1836       checkpointed = 0;
1837 #endif
1838     }
1839
1840     extern void _initDone();
1841
1842     bool compare(char * buf1, char *buf2){
1843       if(CkMyPe()==0)
1844         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1845       PUP::checker pchecker(buf1,buf2);
1846       pchecker.skip();
1847       int numElements;
1848       pchecker|numElements;
1849       for(int i=0;i<numElements;i++){
1850         CkGroupID gID;
1851         CkArrayIndex idx;
1852
1853         pchecker|gID;
1854         pchecker|idx;
1855
1856         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1857         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1858       }
1859       if(CkMyPe()==0)
1860         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1861       //int fault_num = pchecker.getFaultNum();
1862       bool result = pchecker.getResult();
1863       /*if(!result){
1864         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1865       }*/
1866       return result;
1867     }
1868
1869     int getChecksum(char * buf){
1870       PUP::checker pchecker(buf);
1871       pchecker.skip();
1872       int numElements;
1873       pchecker|numElements;
1874 //      CkPrintf("num %d\n",numElements);
1875       for(int i=0;i<numElements;i++){
1876         CkGroupID gID;
1877         CkArrayIndex idx;
1878
1879         pchecker|gID;
1880         pchecker|idx;
1881
1882         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1883         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1884       }
1885       return pchecker.getChecksum();
1886     }
1887
1888     static void recvRemoteChkpHandler(char *msg){
1889       CpvAccess(remoteChkpDone) = 1;
1890       if(CpvAccess(use_checksum)){
1891         if(CkMyPe()==0)
1892           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1893         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1894         CpvAccess(recvdRemote) = 1;
1895         if(CpvAccess(recvdLocal)==1){
1896           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1897             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1898           }
1899           else{
1900             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1901             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1902           }
1903         }
1904         CmiFree(msg);
1905       }else{
1906         envelope *env = (envelope *)msg;
1907         CkUnpackMessage(&env);
1908         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1909         if(CpvAccess(recvdLocal)==1){
1910           int pointer = CpvAccess(curPointer);
1911           int size = CpvAccess(chkpBuf)[pointer]->len;
1912           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1913             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1914           }else
1915           {
1916             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1917             //CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1918           }
1919           CmiFree(msg);
1920           if(CkMyPe()==0)
1921             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1922         }else{
1923           CpvAccess(recvdRemote) = 1;
1924           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1925           CpvAccess(buddyBuf) = chkpMsg;
1926         }
1927       }
1928     }
1929
1930     static void replicaRecoverHandler(char *msg){
1931       //fflush(stdout);
1932       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1933       CpvAccess(remoteChkpDone) = 1;
1934       if(CpvAccess(localChkpDone) == 1)
1935         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1936       else{  
1937         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1938         //CmiAbort("impossible state");
1939       }
1940       CmiFree(msg);
1941     }
1942
1943     
1944     static void replicaBeginFailureInjectionHandler(char * msg){
1945       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1946       checkptMgr.generateFailure();
1947       CmiFree(msg);
1948     }
1949     
1950     static void replicaChkpDoneHandler(char *msg){
1951       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1952       CpvAccess(remoteChkpDone) = 1;
1953       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1954       if(CpvAccess(localChkpDone) == 1)
1955         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1956       else  
1957         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1958       CmiFree(msg);
1959     }
1960
1961     static void replicaDieHandler(char * msg){
1962 #if CMK_CONVERSE_MPI
1963       //broadcast to every one in my replica
1964       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1965       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1966 #endif
1967     }
1968
1969     static void replicaChkpStartHandler(char * msg){
1970       if(CkMyPe()==0){
1971         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1972       }
1973       CpvAccess(remoteStarted) =1;
1974       if(CpvAccess(localStarted)==1){    
1975         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1976         checkptMgr.doItNow(0);
1977       }
1978     }
1979
1980     static void replicaDieBcastHandler(char *msg){
1981       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1982       CpvAccess(_remoteCrashedNode) = diePe;
1983       if(CkMyPe()==diePe){
1984         CmiPrintf("pe %d in replica world die\n",diePe);
1985         fflush(stdout);
1986       }
1987       
1988       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1989         find_spare_mpirank(diePe,CmiMyPartition()^1);
1990       }
1991
1992       //what if am already ready to checkpoint
1993       if(CpvAccess(resilience)!=1){
1994         CkMemCheckPT::replicaAlive = 0;
1995         if(CpvAccess(localStarted)==1){    
1996           if(CkMyPe()==0){
1997             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1998             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1999             checkptMgr.doItNow(0);
2000           }
2001         }
2002         //broadcast to my partition to get local max iter
2003         else{
2004           if(CkMyPe()==diePe){
2005             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
2006           }
2007           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
2008         }
2009       }
2010
2011       CmiFree(msg);
2012     }
2013
2014     static void recoverRemoteProcDataHandler(char *msg){
2015       envelope *env = (envelope *)msg;
2016       CkUnpackMessage(&env);
2017       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
2018
2019       //store the checkpoint
2020       int pointer = procMsg->pointer;
2021
2022
2023       if(CkMyPe()==CpvAccess(_crashedNode)){
2024         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
2025         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
2026         PUP::fromMem p(procMsg->packData);
2027         _handleProcData(p,CmiTrue);
2028         _initDone();
2029         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
2030       }
2031       else{
2032         //shud never happen
2033         CmiAbort("non crashed pe received proc data");
2034         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
2035         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
2036         //_handleProcData(p,CmiFalse);
2037       }
2038       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2039       CKLOCMGR_LOOP(mgr->startInserting(););
2040
2041       CpvAccess(recvdProcChkp) =1;
2042       if(CpvAccess(recvdArrayChkp)==1){
2043         _resume_charm_message();
2044         _diePE = CpvAccess(_crashedNode);
2045         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2046         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2047         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2048 #if CMK_USE_BARRIER
2049         if(CpvAccess(resilience)==1){
2050           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2051         }else
2052           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2053 #else
2054         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2055 #endif 
2056       }
2057     }
2058
2059     static void recoverRemoteArrayDataHandler(char *msg){
2060       envelope *env = (envelope *)msg;
2061       CkUnpackMessage(&env);
2062       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
2063
2064       //store the checkpoint
2065       int pointer = chkpMsg->pointer;
2066       CpvAccess(curPointer) = pointer;
2067       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
2068       CpvAccess(chkpBuf)[pointer] = chkpMsg;
2069       CpvAccess(recvdArrayChkp) =1;
2070       CkMemCheckPT::inRestarting = 1;
2071
2072       if(CkMyPe()==CpvAccess(_crashedNode))
2073         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
2074       
2075       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
2076         _resume_charm_message();
2077         _diePE = CpvAccess(_crashedNode);
2078         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
2079         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2080         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2081         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2082 #if CMK_USE_BARRIER
2083         if(CpvAccess(resilience)==1){
2084           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2085         }else
2086           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2087 #else
2088         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2089 #endif 
2090       }
2091     }
2092
2093     static void recvPhaseHandler(char * msg)
2094     {
2095       //decrease the phase number so that the crashed replica can communicate with the healthy one
2096       CpvAccess(_curRestartPhase)--;
2097       
2098       CkMemCheckPT::inRestarting = 1;
2099       CmiFree(msg);
2100       //notify the buddy in the replica now i can receive the checkpoint msg
2101       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
2102         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2103         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
2104         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
2105         //timer start
2106         if(CpvAccess(_crashedNode)==CkMyPe()){
2107           CkMemCheckPT::startTime = restartT = CmiWallTimer();
2108           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
2109         }
2110       }else{
2111         CpvAccess(curPointer)^=1;
2112         CkMemCheckPT::inRestarting = 1;
2113         _resume_charm_message();
2114         _diePE = CpvAccess(_crashedNode);
2115       }
2116     }
2117     
2118     static void askRecoverDataHandler(char * msg){
2119       CmiFree(msg);
2120       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2121         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
2122       if(CpvAccess(resilience)!=1){
2123         CpvAccess(remoteReady)=1;
2124         if(CpvAccess(localReady)==1){
2125           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2126           {     
2127             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
2128             CkPackMessage(&env);
2129             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2130             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2131             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2132           }
2133           //send the array checkpoint data
2134           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2135           CkPackMessage(&env);
2136           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2137           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2138           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2139             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2140         }
2141       }else{
2142         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2143         int pointer = CpvAccess(curPointer)^1;
2144         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2145         procMsg->pointer = pointer;
2146         envelope * env = (envelope *)(UsrToEnv(procMsg));
2147         CkPackMessage(&env);
2148         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2149         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2150         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2151         
2152         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2153         arrayMsg->pointer = pointer;
2154         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2155         CkPackMessage(&env1);
2156         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2157         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2158         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2159         if(CkMemCheckPT::aboutToDie&&CpvAccess(localStarted)){
2160           CkPrintf("[%d][%d]send checkpoint start notification to my partition\n",CmiMyPartition(), CkMyPe());
2161           char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2162           CmiSetHandler(msg, replicaChkpStartHandlerIdx);
2163           CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
2164           CkMemCheckPT::aboutToDie = 0;
2165         }
2166       }
2167     }
2168     // called on crashed processor
2169     static void recoverProcDataHandler(char *msg)
2170     {
2171 #if CMK_MEM_CHECKPOINT
2172       int i;
2173       envelope *env = (envelope *)msg;
2174       CkUnpackMessage(&env);
2175       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2176       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2177       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2178       PUP::fromMem p(procMsg->packData);
2179       _handleProcData(p,CmiTrue);
2180
2181       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2182       // gzheng
2183       CKLOCMGR_LOOP(mgr->startInserting(););
2184
2185       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2186       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2187       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2188       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2189
2190       _initDone();
2191       //   CpvAccess(_qd)->flushStates();
2192       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2193 #endif
2194     }
2195     //for replica, got the phase number from my neighbor processor in the current partition
2196     static void askPhaseHandler(char *msg)
2197     {
2198 #if CMK_MEM_CHECKPOINT
2199       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2200       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2201       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2202       CmiSetHandler(msg, recvPhaseHandlerIdx);
2203       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2204 #endif
2205     }
2206     // called on its backup processor
2207     // get backup message buffer and sent to crashed processor
2208     static void askProcDataHandler(char *msg)
2209     {
2210 #if CMK_MEM_CHECKPOINT
2211       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2212       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2213       if (CpvAccess(procChkptBuf) == NULL)  {
2214         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2215         CkAbort("no checkpoint found");
2216       }
2217       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2218       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2219
2220       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2221
2222       CkPackMessage(&env);
2223       CmiSetHandler(env, recoverProcDataHandlerIdx);
2224       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2225       CpvAccess(procChkptBuf) = NULL;
2226       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2227 #endif
2228     }
2229
2230     // called on PE 0
2231     void qd_callback(void *m)
2232     {
2233       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2234       fflush(stdout);
2235       CkFreeMsg(m);
2236       if(CmiNumPartition()==1){
2237 #ifdef CMK_SMP
2238         for(int i=0;i<CmiMyNodeSize();i++){
2239           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2240           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2241           CmiSetHandler(msg, askProcDataHandlerIdx);
2242           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2243           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2244         }
2245         return;
2246 #endif
2247         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2248         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2249         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2250         CmiSetHandler(msg, askProcDataHandlerIdx);
2251         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2252         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2253       }
2254       else{
2255         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2256         CmiSetHandler(msg, recvPhaseHandlerIdx);
2257         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2258       }
2259     }
2260
2261     // on crashed node
2262     void CkMemRestart(const char *dummy, CkArgMsg *args)
2263     {
2264 #if CMK_MEM_CHECKPOINT
2265       _diePE = CmiMyNode();
2266       CpvAccess( _crashedNode )= CmiMyNode();
2267       CkMemCheckPT::inRestarting = 1;
2268       _discard_charm_message();
2269
2270       /*if(CmiMyRank()==0){
2271         CkCallback cb(qd_callback);
2272         CkStartQD(cb);
2273         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2274         }*/
2275       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2276       if(CmiNumPartition()==1){
2277         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2278         restartT = CmiWallTimer();
2279         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2280         fflush(stdout);
2281         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2282         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2283         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2284         CmiSetHandler(msg, askProcDataHandlerIdx);
2285         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2286         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2287       }
2288       else{
2289         CkCallback cb(qd_callback);
2290         CkStartQD(cb);
2291       }
2292 #else
2293       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2294 #endif
2295     }
2296
2297     // can be called in other files
2298     // return true if it is in restarting
2299     extern "C"
2300       int CkInRestarting()
2301       {
2302 #if CMK_MEM_CHECKPOINT
2303         if (CpvAccess( _crashedNode)!=-1) return 1;
2304         // gzheng
2305         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2306         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2307         return CkMemCheckPT::inRestarting;
2308 #else
2309         return 0;
2310 #endif
2311       }
2312
2313     extern "C"
2314       int CkReplicaAlive()
2315       {
2316         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2317         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2318         return CkMemCheckPT::replicaAlive;
2319
2320         /*if(CkMemCheckPT::replicaDead==1)
2321           return 0;
2322           else          
2323           return 1;*/
2324       }
2325
2326     extern "C"
2327       int CkInCheckpointing()
2328       {
2329         return CkMemCheckPT::inCheckpointing;
2330       }
2331
2332     extern "C"
2333       void CkSetInLdb(){
2334 #if CMK_MEM_CHECKPOINT
2335         CkMemCheckPT::inLoadbalancing = 1;
2336 #endif
2337       }
2338
2339     extern "C"
2340       int CkInLdb(){
2341 #if CMK_MEM_CHECKPOINT
2342         return CkMemCheckPT::inLoadbalancing;
2343 #endif
2344         return 0;
2345       }
2346
2347     extern "C"
2348       void CkResetInLdb(){
2349 #if CMK_MEM_CHECKPOINT
2350         CkMemCheckPT::inLoadbalancing = 0;
2351 #endif
2352       }
2353
2354     /*****************************************************************************
2355       module initialization
2356      *****************************************************************************/
2357
2358     static int arg_where = CkCheckPoint_inMEM;
2359
2360 #if CMK_MEM_CHECKPOINT
2361     void init_memcheckpt(char **argv)
2362     {
2363       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2364         arg_where = CkCheckPoint_inDISK;
2365       }
2366       CpvInitialize(int, use_checksum);
2367       CpvInitialize(int, resilience);
2368       CpvAccess(use_checksum)=0;
2369       CpvAccess(resilience)=0;//TODO should set a default resilience level
2370       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2371         CpvAccess(use_checksum)=1;
2372       }
2373       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2374         CpvAccess(resilience)=1;
2375       }
2376       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2377         CpvAccess(resilience)=2;
2378       }
2379       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2380         CpvAccess(resilience)=3;
2381       }
2382       // initiliazing _crashedNode variable
2383       CpvInitialize(int, _crashedNode);
2384       CpvInitialize(int, _remoteCrashedNode);
2385       CpvAccess(_crashedNode) = -1;
2386       CpvAccess(_remoteCrashedNode) = -1;
2387       init_FI(argv);
2388     }
2389 #endif
2390
2391     class CkMemCheckPTInit: public Chare {
2392       public:
2393         CkMemCheckPTInit(CkArgMsg *m) {
2394 #if CMK_MEM_CHECKPOINT
2395           if (arg_where == CkCheckPoint_inDISK) {
2396             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2397           }
2398
2399           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2400           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2401 #endif
2402         }
2403     };
2404
2405     static void notifyHandler(char *msg)
2406     {
2407 #if CMK_MEM_CHECKPOINT
2408       CmiFree(msg);
2409       /* immediately increase restart phase to filter old messages */
2410       CpvAccess(_curRestartPhase) ++;
2411       CpvAccess(_qd)->flushStates();
2412       _discard_charm_message();
2413
2414 #endif
2415     }
2416
2417     extern "C"
2418       void notify_crash(int node)
2419       {
2420 #ifdef CMK_MEM_CHECKPOINT
2421         CpvAccess( _crashedNode) = node;
2422 #ifdef CMK_SMP
2423         for(int i=0;i<CkMyNodeSize();i++){
2424           CpvAccessOther(_crashedNode,i)=node;
2425         }
2426 #endif
2427         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2428         CkMemCheckPT::inRestarting = 1;
2429
2430         // this may be in interrupt handler, send a message to reset QD
2431         int pe = CmiNodeFirst(CkMyNode());
2432         for(int i=0;i<CkMyNodeSize();i++){
2433           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2434           CmiSetHandler(msg, notifyHandlerIdx);
2435           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2436         }
2437 #endif
2438       }
2439
2440     extern "C" void (*notify_crash_fn)(int node);
2441
2442
2443 #if CMK_CONVERSE_MPI
2444     void buddyDieHandler(char *msg)
2445     {
2446 #if CMK_MEM_CHECKPOINT
2447       // notify
2448       CkMemCheckPT::inRestarting = 1;
2449       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2450       notify_crash(diepe);
2451       // send message to crash pe to let it restart
2452       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2453       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2454       int buddy = obj->BuddyPE(CmiMyPe());
2455       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2456       if (buddy == diepe)  {
2457         mpi_restart_crashed(diepe, newrank);
2458       }
2459 #endif
2460     }
2461
2462     void pingHandler(void *msg)
2463     {
2464       lastPingTime = CmiWallTimer();
2465       CmiFree(msg);
2466     }
2467
2468     void pingCheckHandler()
2469     {
2470 #if CMK_MEM_CHECKPOINT
2471       double now = CmiWallTimer();
2472       if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2473         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2474         int i, pe, buddy;
2475         // tell everyone the buddy dies
2476         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2477         for (i = 1; i < CmiNumPes(); i++) {
2478           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2479           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2480         }
2481         buddy = pe;
2482         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2483         fflush(stdout);
2484         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2485         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2486         CmiSetHandler(msg, buddyDieHandlerIdx);
2487         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2488         //send to everyone in the other world
2489         if(CmiNumPartition()!=1){
2490           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2491           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2492           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2493           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2494         }
2495       }
2496         else 
2497           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2498 #endif
2499       }
2500
2501       void pingBuddy()
2502       {
2503 #if CMK_MEM_CHECKPOINT
2504         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2505         if (obj) {
2506           int buddy = obj->BuddyPE(CkMyPe());
2507           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2508           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2509           CmiSetHandler(msg, pingHandlerIdx);
2510           CmiGetRestartPhase(msg) = 9999;
2511           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2512         }
2513         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2514 #endif
2515       }
2516 #endif
2517
2518       // initproc
2519       void CkRegisterRestartHandler( )
2520       {
2521 #if CMK_MEM_CHECKPOINT
2522         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2523         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2524         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2525         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2526         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2527         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2528
2529         //for replica
2530         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2531         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2532         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2533         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2534         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2535         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2536         replicaBeginFailureInjectionHandlerIdx = CkRegisterHandler((CmiHandler)replicaBeginFailureInjectionHandler);
2537         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2538         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2539         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2540         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2541         replicaDyingNotifyHandlerIdx = CkRegisterHandler((CmiHandler)replicaDyingNotify);
2542
2543 #if CMK_CONVERSE_MPI
2544         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2545         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2546         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2547 #endif
2548
2549         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2550         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2551         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2552         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2553         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2554         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2555         CpvInitialize(int,curPointer);
2556         CpvInitialize(int,recvdLocal);
2557         CpvInitialize(int,localChkpDone);
2558         CpvInitialize(int,remoteChkpDone);
2559         CpvInitialize(int,remoteStarted);
2560         CpvInitialize(int,localStarted);
2561         CpvInitialize(int,localReady);
2562         CpvInitialize(int,remoteReady);
2563         CpvInitialize(int,recvdRemote);
2564         CpvInitialize(int,recvdProcChkp);
2565         CpvInitialize(int,localChecksum);
2566         CpvInitialize(int,remoteChecksum);
2567         CpvInitialize(int,recvdArrayChkp);
2568
2569         CpvAccess(procChkptBuf) = NULL;
2570         CpvAccess(buddyBuf) = NULL;
2571         CpvAccess(recoverProcBuf) = NULL;
2572         CpvAccess(recoverArrayBuf) = NULL;
2573         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2574         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2575         CpvAccess(chkpBuf)[0] = NULL;
2576         CpvAccess(chkpBuf)[1] = NULL;
2577         CpvAccess(localProcChkpBuf)[0] = NULL;
2578         CpvAccess(localProcChkpBuf)[1] = NULL;
2579
2580         CpvAccess(curPointer) = 0;
2581         CpvAccess(recvdLocal) = 0;
2582         CpvAccess(localChkpDone) = 0;
2583         CpvAccess(remoteChkpDone) = 0;
2584         CpvAccess(remoteStarted) = 0;
2585         CpvAccess(localStarted) = 0;
2586         CpvAccess(localReady) = 0;
2587         CpvAccess(remoteReady) = 0;
2588         CpvAccess(recvdRemote) = 0;
2589         CpvAccess(recvdProcChkp) = 0;
2590         CpvAccess(localChecksum) = 0;
2591         CpvAccess(remoteChecksum) = 0;
2592         CpvAccess(recvdArrayChkp) = 0;
2593
2594         notify_crash_fn = notify_crash;
2595
2596 #if ! CMK_CONVERSE_MPI
2597         // print pid to kill
2598         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2599         //  sleep(4);
2600 #endif
2601 #endif
2602       }
2603
2604
2605       extern "C"
2606         int CkHasCheckpoints()
2607         {
2608           return checkpointed;
2609         }
2610
2611       /// @todo: the following definitions should be moved to a separate file containing
2612       // structures and functions about fault tolerance strategies
2613
2614       /**
2615        *  * @brief: function for killing a process                                             
2616        *   */
2617 #ifdef CMK_MEM_CHECKPOINT
2618 #if CMK_HAS_GETPID
2619       void killLocal(void *_dummy,double curWallTime){
2620         if(CmiWallTimer()<killTime-1){
2621           printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2622           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2623         }else{ 
2624 #if CMK_CONVERSE_MPI
2625           printf("[%d][%d] KillLocal called at %.6lf \n",CmiMyPartition(),CkMyPe(),CmiWallTimer());          
2626           CkDieNow();
2627 #else 
2628           kill(getpid(),SIGKILL);                                               
2629 #endif
2630         }              
2631       } 
2632 #else
2633       void killLocal(void *_dummy,double curWallTime){
2634         CmiAbort("kill() not supported!");
2635       }
2636 #endif
2637       void injectSoftFailure(void *_dummy,double curWallTime){
2638         if(!CkInCheckpointing()&&!CkInRestarting()){
2639           CkPrintf("soft failure injected\n");
2640           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->softFailureInjected = true;
2641         }else{
2642           //next failure
2643           CcdCallFnAfter(injectSoftFailure,NULL,2*1000);
2644         }
2645       }
2646 #endif
2647
2648 #ifdef CMK_MEM_CHECKPOINT
2649       /**
2650        * @brief: reads the file with the kill information
2651        */
2652       void readKillFile(){
2653         FILE *fp=fopen(killFile,"r");
2654         if(!fp){
2655           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2656           return;
2657         }
2658         int proc;
2659         double sec;
2660         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2661           if(proc == CkMyNode() && CkMyRank() == 0){
2662             killTime = CmiWallTimer()+sec;
2663             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2664             CcdCallFnAfter(killLocal,NULL,sec*1000);
2665           }
2666         }
2667         fclose(fp);
2668       }
2669
2670
2671 #if ! CMK_CONVERSE_MPI
2672       void CkDieNow()
2673       {
2674 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2675         // ignored for non-mpi version
2676         CmiPrintf("[%d] die now.\n", CmiMyPe());
2677         killTime = CmiWallTimer()+0.001;
2678         CcdCallFnAfter(killLocal,NULL,1);
2679 #endif
2680       }
2681 #endif
2682
2683 #endif
2684
2685 #include "CkMemCheckpoint.def.h"
2686
2687