only one replica send checkpoints
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, _remoteCrashedNode);
78
79 // static, so that it is accessible from Converse part
80 int CkMemCheckPT::inRestarting = 0;
81 int CkMemCheckPT::inCheckpointing = 0;
82 int CkMemCheckPT::replicaAlive = 1;
83 int CkMemCheckPT::inLoadbalancing = 0;
84 double CkMemCheckPT::startTime;
85 char *CkMemCheckPT::stage;
86 CkCallback CkMemCheckPT::cpCallback;
87
88 int _memChkptOn = 1;                    // checkpoint is on or off
89
90 CkGroupID ckCheckPTGroupID;             // readonly
91
92 static int checkpointed = 0;
93
94 /// @todo the following declarations should be moved into a separate file for all 
95 // fault tolerant strategies
96
97 #ifdef CMK_MEM_CHECKPOINT
98 // name of the kill file that contains processes to be killed 
99 char *killFile;                                               
100 // flag for the kill file         
101 int killFlag=0;
102 // variable for storing the killing time
103 double killTime=0.0;
104 #endif
105
106 #ifdef CKLOCMGR_LOOP
107 #undef CKLOCMGR_LOOP
108 #endif
109 // loop over all CkLocMgr and do "code"
110 #define  CKLOCMGR_LOOP(code)    {       \
111   int numGroups = CkpvAccess(_groupIDTable)->size();    \
112   for(int i=0;i<numGroups;i++) {        \
113     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
114     if(obj->isLocMgr())  {      \
115       CkLocMgr *mgr = (CkLocMgr*)obj;   \
116       code      \
117     }   \
118   }     \
119 }
120
121 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
122 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
123 CpvDeclare(CkCheckPTMessage**, chkpBuf);
124 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
125 //store the checkpoint of the buddy to compare
126 //do not need the whole msg, can be the checksum
127 CpvDeclare(CkCheckPTMessage*, buddyBuf);
128 //pointer of the checkpoint going to be written
129 CpvDeclare(int, curPointer);
130 CpvDeclare(int, recvdRemote);
131 CpvDeclare(int, recvdLocal);
132 CpvDeclare(int, localChkpDone);
133 CpvDeclare(int, remoteChkpDone);
134 CpvDeclare(int, remoteStarted);
135 CpvDeclare(int, localStarted);
136 CpvDeclare(int, recvdArrayChkp);
137 CpvDeclare(int, recvdProcChkp);
138 CpvDeclare(int, localChecksum);
139 CpvDeclare(int, remoteChecksum);
140
141 bool compare(char * buf1, char * buf2);
142 int getChecksum(char * buf);
143 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
144 // Converse function handles
145 static int askPhaseHandlerIdx;
146 static int recvPhaseHandlerIdx;
147 static int askProcDataHandlerIdx;
148 static int restartBcastHandlerIdx;
149 static int recoverProcDataHandlerIdx;
150 static int restartBeginHandlerIdx;
151 static int recvRemoteChkpHandlerIdx;
152 static int replicaDieHandlerIdx;
153 static int replicaChkpStartHandlerIdx;
154 static int replicaDieBcastHandlerIdx;
155 static int replicaRecoverHandlerIdx;
156 static int replicaChkpDoneHandlerIdx;
157 static int recoverRemoteProcDataHandlerIdx;
158 static int recoverRemoteArrayDataHandlerIdx;
159 static int notifyHandlerIdx;
160 // compute the backup processor
161 // FIXME: avoid crashed processors
162 #if CMK_CONVERSE_MPI
163 static int pingHandlerIdx;
164 static int pingCheckHandlerIdx;
165 static int buddyDieHandlerIdx;
166 static double lastPingTime = -1;
167
168 extern "C" void mpi_restart_crashed(int pe, int rank);
169 extern "C" int  find_spare_mpirank(int pe,int partition);
170
171 void pingBuddy();
172 void pingCheckHandler();
173
174 #endif
175 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
176
177 inline int CkMemCheckPT::BuddyPE(int pe)
178 {
179   int budpe;
180 #if NODE_CHECKPOINT
181   // buddy is the processor with same rank on the next physical node
182   int r1 = CmiPhysicalRank(pe);
183   int budnode = CmiPhysicalNodeID(pe);
184   do {
185     budnode = (budnode+1)%CmiNumPhysicalNodes();
186     int *pelist;
187     int num;
188     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
189     budpe = pelist[r1 % num];
190   } while (isFailed(budpe));
191   if (budpe == pe) {
192     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
193     CmiAbort("Failed to find a buddy processor");
194   }
195 #else
196   budpe = pe;
197   budpe = (budpe+1)%CkNumPes();
198 #endif
199   return budpe;
200 }
201
202 // called in array element constructor
203 // choose and register with 2 buddies for checkpoiting 
204 #if CMK_MEM_CHECKPOINT
205 void ArrayElement::init_checkpt() {
206   if (_memChkptOn == 0) return;
207   if (CkInRestarting()) {
208     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
209   }
210   // only master init checkpoint
211   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
212
213   budPEs[0] = CkMyPe();
214   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
215   CmiAssert(budPEs[0] != budPEs[1]);
216   // inform checkPTMgr
217   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
218   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
219   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
220   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
221 }
222 #endif
223
224 // entry function invoked by checkpoint mgr asking for checkpoint data
225 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
226 #if CMK_MEM_CHECKPOINT
227   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
228   //char index[128];   thisIndexMax.sprint(index);
229   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
230   CkLocMgr *locMgr = thisArray->getLocMgr();
231   CmiAssert(myRec!=NULL);
232   int size;
233   {
234     PUP::sizer p;
235     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
236     size = p.size();
237   }
238   int packSize = size/sizeof(double) +1;
239   CkArrayCheckPTMessage *msg =
240     new (packSize, 0) CkArrayCheckPTMessage;
241   msg->len = size;
242   msg->index =thisIndexMax;
243   msg->aid = thisArrayID;
244   msg->locMgr = locMgr->getGroupID();
245   msg->cp_flag = 1;
246   {
247     PUP::toMem p(msg->packData);
248     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
249   }
250
251   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
252   checkptMgr.recvData(msg, 2, budPEs);
253   delete m;
254 #endif
255 }
256
257 // checkpoint holder class - for memory checkpointing
258 class CkMemCheckPTInfo: public CkCheckPTInfo
259 {
260   CkArrayCheckPTMessage *ckBuffer;
261   public:
262   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
263     CkCheckPTInfo(a, loc, idx, pno)
264   {
265     ckBuffer = NULL;
266   }
267   ~CkMemCheckPTInfo() 
268   {
269     if (ckBuffer) delete ckBuffer; 
270   }
271   inline void updateBuffer(CkArrayCheckPTMessage *data) 
272   {
273     CmiAssert(data!=NULL);
274     if (ckBuffer) delete ckBuffer;
275     ckBuffer = data;
276   }    
277   inline CkArrayCheckPTMessage * getCopy()
278   {
279     if (ckBuffer == NULL) {
280       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
281       CmiAbort("Abort!");
282     }
283     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
284   }     
285   inline void updateBuddy(int b1, int b2) {
286     CmiAssert(ckBuffer);
287     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
288     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
289     CmiAssert(pNo != CkMyPe());
290   }
291   inline int getSize() { 
292     CmiAssert(ckBuffer);
293     return ckBuffer->len; 
294   }
295 };
296
297 // checkpoint holder class - for in-disk checkpointing
298 class CkDiskCheckPTInfo: public CkCheckPTInfo 
299 {
300   char *fname;
301   int bud1, bud2;
302   int len;                      // checkpoint size
303   public:
304   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
305   {
306 #if CMK_USE_MKSTEMP
307     fname = new char[64];
308     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
309     mkstemp(fname);
310 #else
311     fname=tmpnam(NULL);
312 #endif
313     bud1 = bud2 = -1;
314     len = 0;
315   }
316   ~CkDiskCheckPTInfo() 
317   {
318     remove(fname);
319   }
320   inline void updateBuffer(CkArrayCheckPTMessage *data) 
321   {
322     double t = CmiWallTimer();
323     // unpack it
324     envelope *env = UsrToEnv(data);
325     CkUnpackMessage(&env);
326     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
327     FILE *f = fopen(fname,"wb");
328     PUP::toDisk p(f);
329     CkPupMessage(p, (void **)&data);
330     // delay sync to the end because otherwise the messages are blocked
331     //    fsync(fileno(f));
332     fclose(f);
333     bud1 = data->bud1;
334     bud2 = data->bud2;
335     len = data->len;
336     delete data;
337     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
338   }
339   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
340   {
341     CkArrayCheckPTMessage *data;
342     FILE *f = fopen(fname,"rb");
343     PUP::fromDisk p(f);
344     CkPupMessage(p, (void **)&data);
345     fclose(f);
346     data->bud1 = bud1;                          // update the buddies
347     data->bud2 = bud2;
348     return data;
349   }
350   inline void updateBuddy(int b1, int b2) {
351     bud1 = b1; bud2 = b2;
352     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
353     CmiAssert(pNo != CkMyPe());
354   }
355   inline int getSize() { 
356     return len; 
357   }
358 };
359
360 CkMemCheckPT::CkMemCheckPT(int w)
361 {
362   int numnodes = 0;
363 #if NODE_CHECKPOINT
364   numnodes = CmiNumPhysicalNodes();
365 #else
366   numnodes = CkNumPes();
367 #endif
368 #if CK_NO_PROC_POOL
369   if (numnodes <= 2)
370 #else
371     if (numnodes  == 1)
372 #endif
373     {
374       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
375       _memChkptOn = 0;
376     }
377   inRestarting = 0;
378   inCheckpointing = 0;
379   recvCount = peCount = 0;
380   ackCount = 0;
381   expectCount = -1;
382   where = w;
383   replicaAlive = 1;
384   notifyReplica = 0;
385 #if CMK_CONVERSE_MPI
386   void pingBuddy();
387   void pingCheckHandler();
388   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
389   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
390 #endif
391   chkpTable[0] = NULL;
392   chkpTable[1] = NULL;
393   maxIter = -1;
394   recvIterCount = 0;
395   localDecided = false;
396 }
397
398 CkMemCheckPT::~CkMemCheckPT()
399 {
400   int len = ckTable.length();
401   for (int i=0; i<len; i++) {
402     delete ckTable[i];
403   }
404 }
405
406 void CkMemCheckPT::pup(PUP::er& p) 
407
408   CBase_CkMemCheckPT::pup(p); 
409   p|cpStarter;
410   p|thisFailedPe;
411   p|failedPes;
412   p|ckCheckPTGroupID;           // recover global variable
413   p|cpCallback;                 // store callback
414   p|where;                      // where to checkpoint
415   p|peCount;
416   if (p.isUnpacking()) {
417     recvCount = 0;
418 #if CMK_CONVERSE_MPI
419     void pingBuddy();
420     void pingCheckHandler();
421     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
422     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
423 #endif
424     maxIter = -1;
425     recvIterCount = 0;
426     localDecided = false;
427   }
428 }
429
430 void CkMemCheckPT::getIter(){
431   localDecided = true;
432   localMaxIter = maxIter+1;
433   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
434   int elemCount = CkCountChkpSyncElements();
435   if(elemCount == 0){
436     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
437   }
438 }
439
440 //when one replica failes, decide the next chkp iter
441
442 void CkMemCheckPT::recvIter(int iter){
443   if(maxIter<iter){
444     maxIter=iter;
445   }
446 }
447
448 void CkMemCheckPT::recvMaxIter(int iter){
449   localDecided = false;
450   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
451 }
452
453 void CkMemCheckPT::reachChkpIter(){
454   recvIterCount++;
455   elemCount = CkCountChkpSyncElements();
456   if(recvIterCount == elemCount){
457     recvIterCount = 0;
458     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
459   }
460 }
461
462 void CkMemCheckPT::startChkp(){
463   CkPrintf("start checkpoint\n");
464   CkStartMemCheckpoint(cpCallback);
465 }
466
467 // called by checkpoint mgr to restore an array element
468 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
469 {
470 #if CMK_MEM_CHECKPOINT
471   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
472   // m->index.print();
473   PUP::fromMem p(m->packData);
474   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
475   CmiAssert(mgr);
476 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
477   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
478 #else
479   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
480 #endif
481
482   // find a list of array elements bound together
483   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
484   CmiAssert(elt);
485   CkLocRec_local *rec = elt->myRec;
486   CkVec<CkMigratable *> list;
487   mgr->migratableList(rec, list);
488   CmiAssert(list.length() > 0);
489   for (int l=0; l<list.length(); l++) {
490     elt = (ArrayElement *)list[l];
491     elt->budPEs[0] = m->bud1;
492     elt->budPEs[1] = m->bud2;
493     //    reset, may not needed now
494     // for now.
495     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
496       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
497       if (c) c->redNo = 0;
498     }
499   }
500 #endif
501 }
502
503 // return 1 if pe is a crashed processor
504 int CkMemCheckPT::isFailed(int pe)
505 {
506   for (int i=0; i<failedPes.length(); i++)
507     if (failedPes[i] == pe) return 1;
508   return 0;
509 }
510
511 // add pe into history list of all failed processors
512 void CkMemCheckPT::failed(int pe)
513 {
514   if (isFailed(pe)) return;
515   failedPes.push_back(pe);
516 }
517
518 int CkMemCheckPT::totalFailed()
519 {
520   return failedPes.length();
521 }
522
523 // create an checkpoint entry for array element of aid with index.
524 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
525 {
526   // error check, no duplicate
527   int idx, len = ckTable.size();
528   for (idx=0; idx<len; idx++) {
529     CkCheckPTInfo *entry = ckTable[idx];
530     if (index == entry->index) {
531       if (loc == entry->locMgr) {
532         // bindTo array elements
533         return;
534       }
535       // for array inheritance, the following check may fail
536       // because ArrayElement constructor of all superclasses are called
537       if (aid == entry->aid) {
538         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
539         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
540       }
541     }
542   }
543   CkCheckPTInfo *newEntry;
544   if (where == CkCheckPoint_inMEM)
545     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
546   else
547     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
548   ckTable.push_back(newEntry);
549   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
550 }
551
552 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
553 {
554 #if !CMK_CHKP_ALL       
555   int buddy = msg->bud1;
556   if (buddy == CkMyPe()) buddy = msg->bud2;
557   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
558   recvData(msg);
559   // ack
560   thisProxy[buddy].gotData();
561 #else
562   chkpTable[0] = NULL;
563   chkpTable[1] = NULL;
564   recvArrayCheckpoint(msg);
565   thisProxy[msg->bud2].gotData();
566 #endif
567 }
568
569 // loop through my checkpoint table and ask checkpointed array elements
570 // to send me checkpoint data.
571 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
572 void CkMemCheckPT::doItNow(int starter)
573 {
574   checkpointed = 1;
575   //cpCallback = cb;
576   cpStarter = starter;
577   inCheckpointing = 1;
578   if (CkMyPe() == cpStarter) {
579     startTime = CmiWallTimer();
580     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
581   }
582 #if CMK_CONVERSE_MPI
583   if(CmiNumPartition()==1)
584 #endif    
585   {
586
587 #if !CMK_CHKP_ALL
588     int len = ckTable.length();
589     for (int i=0; i<len; i++) {
590       CkCheckPTInfo *entry = ckTable[i];
591       // always let the bigger number processor send request
592       //if (CkMyPe() < entry->pNo) continue;
593       // always let the smaller number processor send request, may on same proc
594       if (!isMaster(entry->pNo)) continue;
595       // call inmem_checkpoint to the array element, ask it to send
596       // back checkpoint data via recvData().
597       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
598       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
599     }
600     // if my table is empty, then I am done
601     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
602 #else
603     startArrayCheckpoint();
604 #endif
605     sendProcData();
606   }
607 #if CMK_CONVERSE_MPI
608   else
609   {
610     startCheckpoint();
611   }
612 #endif
613   // pack and send proc level data
614 }
615
616 class MemElementPacker : public CkLocIterator{
617   private:
618     //    CkLocMgr *locMgr;
619     PUP::er &p;
620     std::map<CkHashCode,CkLocation> arrayMap;
621   public:
622     MemElementPacker(PUP::er &p_):p(p_){};
623     void addLocation(CkLocation &loc){
624       CkArrayIndexMax idx = loc.getIndex();
625       arrayMap[idx.hash()] = loc;
626     }
627     void writeCheckpoint(){
628       std::map<CkHashCode, CkLocation>::iterator it;
629       for(it = arrayMap.begin();it!=arrayMap.end();it++){
630         CkLocation loc = it->second;
631         CkLocMgr *locMgr = loc.getManager();
632         CkArrayIndexMax idx = loc.getIndex();
633         CkGroupID gID = locMgr->ckGetGroupID();
634         p|gID;
635         p|idx;
636         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
637       }
638     }
639 };
640
641 void pupAllElements(PUP::er &p){
642 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
643   int numElements;
644   if(!p.isUnpacking()){
645     numElements = CkCountArrayElements();
646   }
647   p | numElements;
648   if(!p.isUnpacking()){
649     MemElementPacker packer(p);
650     CKLOCMGR_LOOP(mgr->iterate(packer););
651     packer.writeCheckpoint();
652   }
653 #endif
654 }
655
656 void CkMemCheckPT::startArrayCheckpoint(){
657 #if CMK_CHKP_ALL
658   int size;
659   {
660     PUP::sizer psizer;
661     pupAllElements(psizer);
662     size = psizer.size();
663   }
664   int packSize = size/sizeof(double)+1;
665   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
666   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
667   msg->len = size;
668   msg->cp_flag = 1;
669   int budPEs[2];
670   msg->bud1=CkMyPe();
671   msg->bud2=ChkptOnPe(CkMyPe());
672   {
673     PUP::toMem p(msg->packData);
674     pupAllElements(p);
675   }
676   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
677   if(chkpTable[0]) delete chkpTable[0];
678   chkpTable[0] = msg;
679   //send the checkpoint to my 
680   recvCount++;
681 #endif
682 }
683
684 void CkMemCheckPT::startCheckpoint(){
685 #if CMK_CONVERSE_MPI
686   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
687     CkPrintf("in start checkpointing!!!!\n");
688   int size;
689   {
690     PUP::sizer p;
691     _handleProcData(p,CmiFalse);
692     size = p.size();
693   }
694   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
695   procMsg->len = size;
696   procMsg->cp_flag = 1;
697   {
698     PUP::toMem p(procMsg->packData);
699     _handleProcData(p,CmiFalse);
700   }
701   int pointer = CpvAccess(curPointer);
702   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
703   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
704
705   {
706     PUP::sizer psizer;
707     pupAllElements(psizer);
708     size = psizer.size();
709   }
710   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
711   msg->len = size;
712   msg->cp_flag = 1;
713   int checksum;
714   {
715     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
716       PUP::checker p(msg->packData);
717       pupAllElements(p);
718       checksum = p.getChecksum();
719     }else{  
720 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
721       PUP::toMem p(msg->packData);
722       pupAllElements(p);
723     }
724   }
725   pointer = CpvAccess(curPointer);
726   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
727   CpvAccess(chkpBuf)[pointer] = msg;
728   if(CkMyPe()==0)
729     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
730   if(CkReplicaAlive()==1){
731     CpvAccess(recvdLocal) = 1;
732     if(CpvAccess(use_checksum)){
733       CpvAccess(localChecksum) = checksum;
734       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
735       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
736       //only one reaplica will send
737       if(CmiMyPartition()==0){
738         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
739         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
740       }
741     }else{
742       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
743       CkPackMessage(&env);
744       if(CmiMyPartition()==0){
745         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
746         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
747       }
748     }
749     if(CmiMyPartition()==0){
750       notifyReplica = 1;
751       thisProxy[CkMyPe()].doneComparison(true);
752     }
753   }
754   if(CpvAccess(recvdRemote)==1){
755     //only partition 1 will do it
756     //compare the checkpoint 
757     int size = CpvAccess(chkpBuf)[pointer]->len;
758     if(CpvAccess(use_checksum)){
759       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
760         thisProxy[CkMyPe()].doneComparison(true);
761       }
762       else{
763         thisProxy[CkMyPe()].doneComparison(false);
764       }
765     }else{
766       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
767         thisProxy[CkMyPe()].doneComparison(true);
768       }
769       else{
770         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
771         thisProxy[CkMyPe()].doneComparison(false);
772       }
773     }
774     if(CkMyPe()==0)
775       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
776   }
777   else{
778     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
779       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
780       { 
781         int pointer = CpvAccess(curPointer);
782         //send the proc data
783         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
784         procMsg->pointer = pointer;
785         envelope * env = (envelope *)(UsrToEnv(procMsg));
786         CkPackMessage(&env);
787         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
788         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
789         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
790           CkPrintf("[%d] sendProcdata\n",CkMyPe());
791         }
792       }
793       //send the array checkpoint data
794       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
795       msg->pointer = CpvAccess(curPointer);
796       envelope * env = (envelope *)(UsrToEnv(msg));
797       CkPackMessage(&env);
798       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
799       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
800       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
801         CkPrintf("[%d] sendArraydata\n",CkMyPe());
802       //can continue work, no need to wait for my replica
803       int _ret = 1;
804       notifyReplica = 1;
805       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
806       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
807     }
808   }
809 #endif
810 }
811
812 void CkMemCheckPT::doneComparison(bool ret){
813   int _ret = 1;
814   if(!ret){
815     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
816     _ret = 0;
817   }else{
818     _ret = 1;
819   }
820   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
821   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
822 }
823
824 void CkMemCheckPT::doneRComparison(int ret){
825   //    if(CpvAccess(curPointer) == 0){
826   //if(ret==CkNumPes()){
827     CpvAccess(localChkpDone) = 1;
828     if(CpvAccess(remoteChkpDone) ==1){
829       thisProxy.doneBothComparison();
830     }else{
831       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
832     }
833     if(notifyReplica == 0){
834       //notify the replica am done
835       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
836       *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
837       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
838       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
839       notifyReplica = 1;
840     }
841  // }
842  /* else{
843     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
844     startTime = CmiWallTimer();
845     thisProxy.RollBack();
846   }*/
847 }
848
849 void CkMemCheckPT::doneBothComparison(){
850   CpvAccess(recvdRemote) = 0;
851   CpvAccess(recvdLocal) = 0;
852   CpvAccess(localChkpDone) = 0;
853   CpvAccess(remoteChkpDone) = 0;
854   CpvAccess(remoteStarted) = 0;
855   CpvAccess(localStarted) = 0;
856   CpvAccess(_remoteCrashedNode) = -1;
857   CkMemCheckPT::replicaAlive = 1;
858   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
859   CpvAccess(curPointer)^=1;
860   inCheckpointing = 0;
861   notifyReplica = 0;
862   if(CkMyPe() == 0){
863     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
864   }
865   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
866 }
867
868 void CkMemCheckPT::RollBack(){
869   //restore group data
870   checkpointed = 0;
871   CkMemCheckPT::inRestarting = 1;
872   int pointer = CpvAccess(curPointer)^1;//use the previous one
873   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
874   PUP::fromMem p(chkpMsg->packData);    
875
876   //destroy array elements
877   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
878   int numGroups = CkpvAccess(_groupIDTable)->size();
879   for(int i=0;i<numGroups;i++) {
880     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
881     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
882     obj->flushStates();
883     obj->ckJustMigrated();
884   }
885   //restore array elements
886
887   int numElements;
888   p|numElements;
889
890   if(p.isUnpacking()){
891     for(int i=0;i<numElements;i++){
892       CkGroupID gID;
893       CkArrayIndex idx;
894       p|gID;
895       p|idx;
896       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
897       CmiAssert(mgr);
898       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
899     }
900     }
901     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
902     contribute(cb);
903   }
904
905   void CkMemCheckPT::notifyReplicaDie(int pe){
906     replicaAlive = 0;
907     CpvAccess(_remoteCrashedNode) = pe;
908   }
909
910   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
911   {
912 #if CMK_CHKP_ALL
913     int idx = 1;
914     if(msg->bud1 == CkMyPe()){
915       idx = 0;
916     }
917     int isChkpting = msg->cp_flag;
918     if(isChkpting == 1){
919       if(chkpTable[idx]) delete chkpTable[idx];
920     }
921     chkpTable[idx] = msg;
922     if(isChkpting){
923       recvCount++;
924       if(recvCount == 2){
925         if (where == CkCheckPoint_inMEM) {
926           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
927         }
928         else if (where == CkCheckPoint_inDISK) {
929           // another barrier for finalize the writing using fsync
930           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
931           contribute(0,NULL,CkReduction::sum_int,localcb);
932         }
933         else
934           CmiAbort("Unknown checkpoint scheme");
935         recvCount = 0;
936       }
937     }
938 #endif
939   }
940
941   // don't handle array elements
942   static inline void _handleProcData(PUP::er &p, CmiBool create)
943   {
944     // save readonlys, and callback BTW
945     CkPupROData(p);
946
947     // save mainchares 
948     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
949
950 #ifndef CMK_CHARE_USE_PTR
951     // save non-migratable chare
952     CkPupChareData(p);
953 #endif
954
955     // save groups into Groups.dat
956     CkPupGroupData(p,create);
957
958     // save nodegroups into NodeGroups.dat
959     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
960   }
961
962   void CkMemCheckPT::sendProcData()
963   {
964     // find out size of buffer
965     int size;
966     {
967       PUP::sizer p;
968       _handleProcData(p,CmiTrue);
969       size = p.size();
970     }
971     int packSize = size;
972     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
973     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
974     {
975       PUP::toMem p(msg->packData);
976       _handleProcData(p,CmiTrue);
977     }
978     msg->pe = CkMyPe();
979     msg->len = size;
980     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
981     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
982   }
983
984   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
985   {
986     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
987     CpvAccess(procChkptBuf) = msg;
988     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
989     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
990   }
991
992   // ArrayElement call this function to give us the checkpointed data
993   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
994   {
995     int len = ckTable.length();
996     int idx;
997     for (idx=0; idx<len; idx++) {
998       CkCheckPTInfo *entry = ckTable[idx];
999       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1000     }
1001     CkAssert(idx < len);
1002     int isChkpting = msg->cp_flag;
1003     ckTable[idx]->updateBuffer(msg);
1004     if (isChkpting) {
1005       // all my array elements have returned their inmem data
1006       // inform starter processor that I am done.
1007       recvCount ++;
1008       if (recvCount == ckTable.length()) {
1009         if (where == CkCheckPoint_inMEM) {
1010           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1011         }
1012         else if (where == CkCheckPoint_inDISK) {
1013           // another barrier for finalize the writing using fsync
1014           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1015           contribute(0,NULL,CkReduction::sum_int,localcb);
1016         }
1017         else
1018           CmiAbort("Unknown checkpoint scheme");
1019         recvCount = 0;
1020       } 
1021     }
1022   }
1023
1024   // only used in disk checkpointing
1025   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1026   {
1027     delete m;
1028 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1029     system("sync");
1030 #endif
1031     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1032   }
1033
1034   // only is called on cpStarter when checkpoint is done
1035   void CkMemCheckPT::cpFinish()
1036   {
1037     CmiAssert(CkMyPe() == cpStarter);
1038     peCount++;
1039     // now that all processors have finished, activate callback
1040     if (peCount == 2) 
1041     {
1042       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1043       peCount = 0;
1044       thisProxy.report();
1045     }
1046   }
1047
1048   // for debugging, report checkpoint info
1049   void CkMemCheckPT::report()
1050   {
1051     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1052     inCheckpointing = 0;
1053 #if !CMK_CHKP_ALL
1054     int objsize = 0;
1055     int len = ckTable.length();
1056     for (int i=0; i<len; i++) {
1057       CkCheckPTInfo *entry = ckTable[i];
1058       CmiAssert(entry);
1059       objsize += entry->getSize();
1060     }
1061     CmiAssert(CpvAccess(procChkptBuf));
1062     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1063 #else
1064     if(CkMyPe()==0)
1065       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1066 #endif
1067   }
1068
1069   /*****************************************************************************
1070     RESTART Procedure
1071    *****************************************************************************/
1072
1073   // master processor of two buddies
1074   inline int CkMemCheckPT::isMaster(int buddype)
1075   {
1076 #if 0
1077     int mype = CkMyPe();
1078     //CkPrintf("ismaster: %d %d\n", pe, mype);
1079     if (CkNumPes() - totalFailed() == 2) {
1080       return mype > buddype;
1081     }
1082     for (int i=1; i<CkNumPes(); i++) {
1083       int me = (buddype+i)%CkNumPes();
1084       if (isFailed(me)) continue;
1085       if (me == mype) return 1;
1086       else return 0;
1087     }
1088     return 0;
1089 #else
1090     // smaller one
1091     int mype = CkMyPe();
1092     //CkPrintf("ismaster: %d %d\n", pe, mype);
1093     if (CkNumPes() - totalFailed() == 2) {
1094       return mype < buddype;
1095     }
1096 #if NODE_CHECKPOINT
1097     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1098     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1099 #else
1100       for (int i=1; i<CkNumPes(); i++) {
1101 #endif
1102         int me = (mype+i)%CkNumPes();
1103         if (isFailed(me)) continue;
1104         if (me == buddype) return 1;
1105         else return 0;
1106       }
1107       return 0;
1108 #endif
1109     }
1110
1111
1112
1113 #if 0
1114     // helper class to pup all elements that belong to same ckLocMgr
1115     class ElementDestoryer : public CkLocIterator {
1116       private:
1117         CkLocMgr *locMgr;
1118       public:
1119         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1120         void addLocation(CkLocation &loc) {
1121           CkArrayIndex idx=loc.getIndex();
1122           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1123           loc.destroy();
1124         }
1125     };
1126 #endif
1127
1128     // restore the bitmap vector for LB
1129     void CkMemCheckPT::resetLB(int diepe)
1130     {
1131 #if CMK_LBDB_ON
1132       int i;
1133       char *bitmap = new char[CkNumPes()];
1134       // set processor available bitmap
1135       get_avail_vector(bitmap);
1136       for (i=0; i<failedPes.length(); i++)
1137         bitmap[failedPes[i]] = 0; 
1138       bitmap[diepe] = 0;
1139
1140 #if CK_NO_PROC_POOL
1141       set_avail_vector(bitmap);
1142 #endif
1143
1144       // if I am the crashed pe, rebuild my failedPEs array
1145       if (CkMyNode() == diepe)
1146         for (i=0; i<CkNumPes(); i++) 
1147           if (bitmap[i]==0) failed(i);
1148
1149       delete [] bitmap;
1150 #endif
1151     }
1152
1153     static double restartT;
1154
1155     // in case when failedPe dies, everybody go through its checkpoint table:
1156     // destory all array elements
1157     // recover lost buddies
1158     // reconstruct all array elements from check point data
1159     // called on all processors
1160     void CkMemCheckPT::restart(int diePe)
1161     {
1162 #if CMK_MEM_CHECKPOINT
1163       double curTime = CmiWallTimer();
1164       if (CkMyPe() == diePe){
1165         restartT = CmiWallTimer();
1166         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1167       }
1168       stage = (char*)"resetLB";
1169       startTime = curTime;
1170       if (CkMyPe() == diePe)
1171         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1172
1173 #if CK_NO_PROC_POOL
1174       failed(diePe);    // add into the list of failed pes
1175 #endif
1176       thisFailedPe = diePe;
1177
1178       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1179
1180       inRestarting = 1;
1181
1182       // disable load balancer's barrier
1183       //if (CkMyPe() != diePe) resetLB(diePe);
1184
1185       CKLOCMGR_LOOP(mgr->startInserting(););
1186
1187
1188       if(CmiNumPartition()==1){
1189         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1190       }else{
1191         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1192         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1193       }
1194 #endif
1195     }
1196
1197     // loally remove all array elements
1198     void CkMemCheckPT::removeArrayElements()
1199     {
1200 #if CMK_MEM_CHECKPOINT
1201       int len = ckTable.length();
1202       double curTime = CmiWallTimer();
1203       if (CkMyPe() == thisFailedPe) 
1204         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1205       stage = (char*)"removeArrayElements";
1206       startTime = curTime;
1207
1208       //  if (cpCallback.isInvalid()) 
1209       //          CkPrintf("invalid pe %d\n",CkMyPe());
1210       //          CkAbort("Didn't set restart callback\n");;
1211       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1212
1213       // get rid of all buffering and remote recs
1214       // including destorying all array elements
1215 #if CK_NO_PROC_POOL  
1216       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1217 #else
1218       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1219 #endif
1220       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1221 #endif
1222     }
1223
1224     // flush state in reduction manager
1225     void CkMemCheckPT::resetReductionMgr()
1226     {
1227       if (CkMyPe() == thisFailedPe) 
1228         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1229       int numGroups = CkpvAccess(_groupIDTable)->size();
1230       for(int i=0;i<numGroups;i++) {
1231         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1232         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1233         obj->flushStates();
1234         obj->ckJustMigrated();
1235       }
1236       // reset again
1237       //CpvAccess(_qd)->flushStates();
1238       if(CmiNumPartition()==1){
1239         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1240       }
1241       else
1242         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1243     }
1244
1245     // recover the lost buddies
1246     void CkMemCheckPT::recoverBuddies()
1247     {
1248       int idx;
1249       int len = ckTable.length();
1250       // ready to flush reduction manager
1251       // cannot be CkMemCheckPT::restart because destory will modify states
1252       double curTime = CmiWallTimer();
1253       if (CkMyPe() == thisFailedPe)
1254         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1255       stage = (char *)"recoverBuddies";
1256       if (CkMyPe() == thisFailedPe)
1257         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1258       startTime = curTime;
1259
1260       // recover buddies
1261       expectCount = 0;
1262 #if !CMK_CHKP_ALL
1263       for (idx=0; idx<len; idx++) {
1264         CkCheckPTInfo *entry = ckTable[idx];
1265         if (entry->pNo == thisFailedPe) {
1266 #if CK_NO_PROC_POOL
1267           // find a new buddy
1268           int budPe = BuddyPE(CkMyPe());
1269 #else
1270           int budPe = thisFailedPe;
1271 #endif
1272           CkArrayCheckPTMessage *msg = entry->getCopy();
1273           msg->bud1 = budPe;
1274           msg->bud2 = CkMyPe();
1275           msg->cp_flag = 0;            // not checkpointing
1276           thisProxy[budPe].recoverEntry(msg);
1277           expectCount ++;
1278         }
1279       }
1280 #else
1281       //send to failed pe
1282       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1283 #if CK_NO_PROC_POOL
1284         // find a new buddy
1285         int budPe = BuddyPE(CkMyPe());
1286 #else
1287         int budPe = thisFailedPe;
1288 #endif
1289         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1290         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1291         msg->cp_flag = 0;            // not checkpointing
1292         msg->bud1 = budPe;
1293         msg->bud2 = CkMyPe();
1294         thisProxy[budPe].recoverEntry(msg);
1295         expectCount ++;
1296       }
1297 #endif
1298
1299       if (expectCount == 0) {
1300         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1301       }
1302       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1303     }
1304
1305     void CkMemCheckPT::gotData()
1306     {
1307       ackCount ++;
1308       if (ackCount == expectCount) {
1309         ackCount = 0;
1310         expectCount = -1;
1311         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1312         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1313       }
1314     }
1315
1316     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1317     {
1318
1319       for (int i=0; i<n; i++) {
1320         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1321         mgr->updateLocation(idx[i], nowOnPe);
1322       }
1323       thisProxy[nowOnPe].gotReply();
1324     }
1325
1326     // restore array elements
1327     void CkMemCheckPT::recoverArrayElements()
1328     {
1329       double curTime = CmiWallTimer();
1330       int len = ckTable.length();
1331       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1332       stage = (char *)"recoverArrayElements";
1333       if (CkMyPe() == thisFailedPe)
1334         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1335       startTime = curTime;
1336       int flag = 0;
1337       // recover all array elements
1338       int count = 0;
1339
1340 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1341       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1342       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1343 #endif
1344
1345 #if !CMK_CHKP_ALL
1346       for (int idx=0; idx<len; idx++)
1347       {
1348         CkCheckPTInfo *entry = ckTable[idx];
1349 #if CK_NO_PROC_POOL
1350         // the bigger one will do 
1351         //    if (CkMyPe() < entry->pNo) continue;
1352         if (!isMaster(entry->pNo)) continue;
1353 #else
1354         // smaller one do it, which has the original object
1355         if (CkMyPe() == entry->pNo+1 || 
1356             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1357 #endif
1358         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1359
1360         entry->updateBuddy(CkMyPe(), entry->pNo);
1361         CkArrayCheckPTMessage *msg = entry->getCopy();
1362         // gzheng
1363         //thisProxy[CkMyPe()].inmem_restore(msg);
1364         inmem_restore(msg);
1365 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1366         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1367         int homePe = mgr->homePe(msg->index);
1368         if (homePe != CkMyPe()) {
1369           gmap[homePe].push_back(msg->locMgr);
1370           imap[homePe].push_back(msg->index);
1371         }
1372 #endif
1373         CkFreeMsg(msg);
1374         count ++;
1375       }
1376 #else
1377       char * packData;
1378       if(CmiNumPartition()==1){
1379         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1380         packData = (char *)msg->packData;
1381       }
1382       else{
1383         int pointer = CpvAccess(curPointer);
1384         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1385         packData = msg->packData;
1386       }
1387 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1388       recoverAll(packData,gmap,imap);
1389 #else
1390       recoverAll(packData);
1391 #endif
1392 #endif
1393 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1394       for (int i=0; i<CkNumPes(); i++) {
1395         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1396           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1397           flag++;       
1398         }
1399       }
1400       delete [] imap;
1401       delete [] gmap;
1402 #endif
1403       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1404
1405       CKLOCMGR_LOOP(mgr->doneInserting(););
1406
1407       // _crashedNode = -1;
1408       CpvAccess(_crashedNode) = -1;
1409 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1410       if (CkMyPe() == 0)
1411         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1412 #else
1413       if(flag == 0)
1414       {
1415         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1416       }
1417 #endif
1418     }
1419
1420     void CkMemCheckPT::gotReply(){
1421       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1422     }
1423
1424     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1425 #if CMK_CHKP_ALL
1426       PUP::fromMem p(packData);
1427       int numElements;
1428       p|numElements;
1429       if(p.isUnpacking()){
1430         for(int i=0;i<numElements;i++){
1431           CkGroupID gID;
1432           CkArrayIndex idx;
1433           p|gID;
1434           p|idx;
1435           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1436           int homePe = mgr->homePe(idx);
1437 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1438           mgr->resume(idx,p,CmiTrue,CmiTrue);
1439 #else
1440           if(CmiNumPartition()==1)
1441             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1442           else{
1443             if(CkMyPe()==thisFailedPe){
1444               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1445             }
1446             else{
1447               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1448             }
1449           }
1450 #endif
1451 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1452           homePe = mgr->homePe(idx);
1453           if (homePe != CkMyPe()) {
1454             gmap[homePe].push_back(gID);
1455             imap[homePe].push_back(idx);
1456           }
1457 #endif
1458         }
1459       }
1460 #endif
1461     }
1462
1463
1464     // on every processor
1465     // turn load balancer back on
1466     void CkMemCheckPT::finishUp()
1467     {
1468       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1469       //CKLOCMGR_LOOP(mgr->doneInserting(););
1470
1471       if (CkMyPe() == thisFailedPe)
1472       {
1473         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1474         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1475         fflush(stdout);
1476       }
1477       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1478       inRestarting = 0;
1479
1480 #if CMK_CONVERSE_MPI    
1481       if(CmiNumPartition()!=1){
1482         CpvAccess(recvdProcChkp) = 0;
1483         CpvAccess(recvdArrayChkp) = 0;
1484         CpvAccess(curPointer)^=1;
1485         //notify my replica, restart is done
1486         if (CkMyPe() == 0){
1487           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1488           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1489           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1490         }
1491       }
1492       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1493         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1494       }
1495 #endif
1496
1497 #if CK_NO_PROC_POOL
1498 #if NODE_CHECKPOINT
1499       int numnodes = CmiNumPhysicalNodes();
1500 #else
1501       int numnodes = CkNumPes();
1502 #endif
1503       if (numnodes-totalFailed() <=2) {
1504         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1505         _memChkptOn = 0;
1506       }
1507 #endif
1508     }
1509
1510     void CkMemCheckPT::recoverFromSoftFailure()
1511     {
1512       inRestarting = 0;
1513       CpvAccess(recvdRemote) = 0;
1514       CpvAccess(recvdLocal) = 0;
1515       CpvAccess(localChkpDone) = 0;
1516       CpvAccess(remoteChkpDone) = 0;
1517       inCheckpointing = 0;
1518       notifyReplica = 0;
1519       if(CkMyPe() == 0){
1520         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1521       }
1522       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1523     }
1524     // called only on 0
1525     void CkMemCheckPT::quiescence(CkCallback &cb)
1526     {
1527       static int pe_count = 0;
1528       pe_count ++;
1529       CmiAssert(CkMyPe() == 0);
1530       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1531       if (pe_count == CkNumPes()) {
1532         pe_count = 0;
1533         cb.send();
1534       }
1535     }
1536
1537     // User callable function - to start a checkpoint
1538     // callback cb is used to pass control back
1539     void CkStartMemCheckpoint(CkCallback &cb)
1540     {
1541 #if CMK_MEM_CHECKPOINT
1542       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1543       /*if (_memChkptOn == 0) {
1544         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1545         cb.send();
1546         return;
1547         }*/
1548       if (CkInRestarting()) {
1549         // trying to checkpointing during restart
1550         cb.send();
1551         return;
1552       }
1553       // store user callback and user data
1554       CkMemCheckPT::cpCallback = cb;
1555
1556
1557       //send to my replica that checkpoint begins 
1558       if(CkReplicaAlive()==1){
1559         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1560         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1561         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1562       }
1563       CpvAccess(localStarted) = 1;
1564       // broadcast to start check pointing
1565       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1566         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1567         checkptMgr.doItNow(CkMyPe());
1568       }
1569 #else
1570       // when mem checkpoint is disabled, invike cb immediately
1571       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1572       cb.send();
1573 #endif
1574     }
1575
1576     void CkRestartCheckPoint(int diePe)
1577     {
1578       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1579       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1580       // broadcast
1581       checkptMgr.restart(diePe);
1582     }
1583
1584     static int _diePE = -1;
1585
1586     // callback function used locally by ccs handler
1587     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1588     {
1589       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1590       CkRestartCheckPoint(_diePE);
1591     }
1592
1593
1594     // called on crashed PE
1595     static void restartBeginHandler(char *msg)
1596     {
1597       CmiFree(msg);
1598 #if CMK_MEM_CHECKPOINT
1599 #if CMK_USE_BARRIER
1600       if(CkMyPe()!=_diePE){
1601         printf("restar begin on %d\n",CkMyPe());
1602         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1603         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1604         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1605       }else{
1606         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1607         CkRestartCheckPointCallback(NULL, NULL);
1608       }
1609 #else
1610       static int count = 0;
1611       CmiAssert(CkMyPe() == _diePE);
1612       count ++;
1613       if (count == CkNumPes()) {
1614         printf("restart begin on %d\n",CkMyPe());
1615         CkRestartCheckPointCallback(NULL, NULL);
1616         count = 0;
1617       }
1618 #endif
1619 #endif
1620     }
1621
1622     extern void _discard_charm_message();
1623     extern void _resume_charm_message();
1624
1625     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1626       return data;
1627     }
1628
1629     static void restartBcastHandler(char *msg)
1630     {
1631 #if CMK_MEM_CHECKPOINT
1632       // advance phase counter
1633       CkMemCheckPT::inRestarting = 1;
1634       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1635       // gzheng
1636       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1637
1638       if (CkMyPe()==_diePE)
1639         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1640
1641       // reset QD counters
1642       /*  gzheng
1643           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1644        */
1645
1646       /*  gzheng
1647           if (CkMyPe()==_diePE)
1648           CkRestartCheckPointCallback(NULL, NULL);
1649        */
1650       CmiFree(msg);
1651
1652       _resume_charm_message();
1653
1654       // reduction
1655       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1656       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1657 #if CMK_USE_BARRIER
1658       //CmiPrintf("before reduce\n");   
1659       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1660       //CmiPrintf("after reduce\n");    
1661 #else
1662       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1663 #endif 
1664       checkpointed = 0;
1665 #endif
1666     }
1667
1668     extern void _initDone();
1669
1670     bool compare(char * buf1, char *buf2){
1671       if(CkMyPe()==0)
1672         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1673       PUP::checker pchecker(buf1,buf2);
1674       pchecker.skip();
1675       int numElements;
1676       pchecker|numElements;
1677       for(int i=0;i<numElements;i++){
1678         CkGroupID gID;
1679         CkArrayIndex idx;
1680
1681         pchecker|gID;
1682         pchecker|idx;
1683
1684         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1685         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1686       }
1687       if(CkMyPe()==0)
1688         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1689       //int fault_num = pchecker.getFaultNum();
1690       bool result = pchecker.getResult();
1691       /*if(!result){
1692         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1693       }*/
1694       return result;
1695     }
1696     int getChecksum(char * buf){
1697       PUP::checker pchecker(buf);
1698       pchecker.skip();
1699       int numElements;
1700       pchecker|numElements;
1701 //      CkPrintf("num %d\n",numElements);
1702       for(int i=0;i<numElements;i++){
1703         CkGroupID gID;
1704         CkArrayIndex idx;
1705
1706         pchecker|gID;
1707         pchecker|idx;
1708
1709         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1710         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1711       }
1712       return pchecker.getChecksum();
1713     }
1714
1715     static void recvRemoteChkpHandler(char *msg){
1716       CpvAccess(remoteChkpDone) = 1;
1717       if(CpvAccess(use_checksum)){
1718         if(CkMyPe()==0)
1719           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1720         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1721         CpvAccess(recvdRemote) = 1;
1722         if(CpvAccess(recvdLocal)==1){
1723           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1724             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1725           }
1726           else{
1727             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1728           }
1729         }
1730       }else{
1731         envelope *env = (envelope *)msg;
1732         CkUnpackMessage(&env);
1733         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1734         if(CpvAccess(recvdLocal)==1){
1735           int pointer = CpvAccess(curPointer);
1736           int size = CpvAccess(chkpBuf)[pointer]->len;
1737           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1738             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1739           }else
1740           {
1741             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1742           }
1743           delete chkpMsg;
1744           if(CkMyPe()==0)
1745             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1746         }else{
1747           CpvAccess(recvdRemote) = 1;
1748           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1749           CpvAccess(buddyBuf) = chkpMsg;
1750         }
1751       }
1752     }
1753
1754     static void replicaRecoverHandler(char *msg){
1755       //fflush(stdout);
1756       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1757       CpvAccess(remoteChkpDone) = 1;
1758       if(CpvAccess(localChkpDone) == 1)
1759         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1760       CmiFree(msg);
1761     }
1762
1763     static void replicaChkpDoneHandler(char *msg){
1764       CpvAccess(remoteChkpDone) = 1;
1765       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1766       if(CpvAccess(localChkpDone) == 1)
1767         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1768       CmiFree(msg);
1769     }
1770
1771     static void replicaDieHandler(char * msg){
1772 #if CMK_CONVERSE_MPI
1773       //broadcast to every one in my replica
1774       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1775       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1776 #endif
1777     }
1778
1779     static void replicaChkpStartHandler(char * msg){
1780       CpvAccess(remoteStarted) =1;
1781       if(CpvAccess(localStarted)==1){    
1782         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1783         checkptMgr.doItNow(0);
1784       }
1785     }
1786
1787
1788     static void replicaDieBcastHandler(char *msg){
1789       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1790       CpvAccess(_remoteCrashedNode) = diePe;
1791       CkMemCheckPT::replicaAlive = 0;
1792       if(CkMyPe()==diePe){
1793         CmiPrintf("pe %d in replicad word die\n",diePe);
1794         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1795         fflush(stdout);
1796       }
1797       find_spare_mpirank(diePe,CmiMyPartition()^1);
1798       //broadcast to my partition to get local max iter
1799       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1800       CmiFree(msg);
1801     }
1802
1803     static void recoverRemoteProcDataHandler(char *msg){
1804       envelope *env = (envelope *)msg;
1805       CkUnpackMessage(&env);
1806       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1807
1808       //store the checkpoint
1809       int pointer = procMsg->pointer;
1810
1811
1812       if(CkMyPe()==CpvAccess(_crashedNode)){
1813         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1814         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1815         PUP::fromMem p(procMsg->packData);
1816         _handleProcData(p,CmiTrue);
1817         _initDone();
1818       }
1819       else{
1820         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1821         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1822         //_handleProcData(p,CmiFalse);
1823       }
1824       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1825       CKLOCMGR_LOOP(mgr->startInserting(););
1826
1827       CpvAccess(recvdProcChkp) =1;
1828       if(CpvAccess(recvdArrayChkp)==1){
1829         _resume_charm_message();
1830         _diePE = CpvAccess(_crashedNode);
1831         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1832         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1833         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1834       }
1835     }
1836
1837     static void recoverRemoteArrayDataHandler(char *msg){
1838       envelope *env = (envelope *)msg;
1839       CkUnpackMessage(&env);
1840       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1841
1842       //store the checkpoint
1843       int pointer = chkpMsg->pointer;
1844       CpvAccess(curPointer) = pointer;
1845       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1846       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1847       CpvAccess(recvdArrayChkp) =1;
1848       CkMemCheckPT::inRestarting = 1;
1849       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1850         _resume_charm_message();
1851         _diePE = CpvAccess(_crashedNode);
1852         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1853         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1854         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1855         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1856       }
1857     }
1858
1859     static void recvPhaseHandler(char * msg)
1860     {
1861       CpvAccess(_curRestartPhase)--;
1862       CkMemCheckPT::inRestarting = 1;
1863       CmiFree(msg);
1864     }
1865     // called on crashed processor
1866     static void recoverProcDataHandler(char *msg)
1867     {
1868 #if CMK_MEM_CHECKPOINT
1869       int i;
1870       envelope *env = (envelope *)msg;
1871       CkUnpackMessage(&env);
1872       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1873       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1874       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1875       PUP::fromMem p(procMsg->packData);
1876       _handleProcData(p,CmiTrue);
1877
1878       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1879       // gzheng
1880       CKLOCMGR_LOOP(mgr->startInserting(););
1881
1882       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1883       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1884       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1885       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1886
1887       _initDone();
1888       //   CpvAccess(_qd)->flushStates();
1889       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1890 #endif
1891     }
1892     //for replica, got the phase number from my neighbor processor in the current partition
1893     static void askPhaseHandler(char *msg)
1894     {
1895 #if CMK_MEM_CHECKPOINT
1896       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1897       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1898       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1899       CmiSetHandler(msg, recvPhaseHandlerIdx);
1900       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1901 #endif
1902     }
1903     // called on its backup processor
1904     // get backup message buffer and sent to crashed processor
1905     static void askProcDataHandler(char *msg)
1906     {
1907 #if CMK_MEM_CHECKPOINT
1908       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1909       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1910       if (CpvAccess(procChkptBuf) == NULL)  {
1911         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1912         CkAbort("no checkpoint found");
1913       }
1914       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1915       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1916
1917       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1918
1919       CkPackMessage(&env);
1920       CmiSetHandler(env, recoverProcDataHandlerIdx);
1921       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1922       CpvAccess(procChkptBuf) = NULL;
1923       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1924 #endif
1925     }
1926
1927     // called on PE 0
1928     void qd_callback(void *m)
1929     {
1930       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1931       fflush(stdout);
1932       CkFreeMsg(m);
1933       if(CmiNumPartition()==1){
1934 #ifdef CMK_SMP
1935         for(int i=0;i<CmiMyNodeSize();i++){
1936           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1937           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1938           CmiSetHandler(msg, askProcDataHandlerIdx);
1939           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1940           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1941         }
1942         return;
1943 #endif
1944         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1945         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1946         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1947         CmiSetHandler(msg, askProcDataHandlerIdx);
1948         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1949         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1950       }
1951       else{
1952         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1953         CmiSetHandler(msg, recvPhaseHandlerIdx);
1954         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1955       }
1956     }
1957
1958     // on crashed node
1959     void CkMemRestart(const char *dummy, CkArgMsg *args)
1960     {
1961 #if CMK_MEM_CHECKPOINT
1962       _diePE = CmiMyNode();
1963       CpvAccess( _crashedNode )= CmiMyNode();
1964       CkMemCheckPT::inRestarting = 1;
1965       _discard_charm_message();
1966
1967       /*if(CmiMyRank()==0){
1968         CkCallback cb(qd_callback);
1969         CkStartQD(cb);
1970         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1971         }*/
1972       CkMemCheckPT::startTime = restartT = CmiWallTimer();
1973       if(CmiNumPartition()==1){
1974         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1975         restartT = CmiWallTimer();
1976         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1977         fflush(stdout);
1978         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1979         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1980         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1981         CmiSetHandler(msg, askProcDataHandlerIdx);
1982         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1983         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1984       }
1985       else{
1986         CkCallback cb(qd_callback);
1987         CkStartQD(cb);
1988       }
1989 #else
1990       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1991 #endif
1992     }
1993
1994     // can be called in other files
1995     // return true if it is in restarting
1996     extern "C"
1997       int CkInRestarting()
1998       {
1999 #if CMK_MEM_CHECKPOINT
2000         if (CpvAccess( _crashedNode)!=-1) return 1;
2001         // gzheng
2002         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2003         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2004         return CkMemCheckPT::inRestarting;
2005 #else
2006         return 0;
2007 #endif
2008       }
2009
2010     extern "C"
2011       int CkReplicaAlive()
2012       {
2013         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2014         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2015         return CkMemCheckPT::replicaAlive;
2016
2017         /*if(CkMemCheckPT::replicaDead==1)
2018           return 0;
2019           else          
2020           return 1;*/
2021       }
2022
2023     extern "C"
2024       int CkInCheckpointing()
2025       {
2026         return CkMemCheckPT::inCheckpointing;
2027       }
2028
2029     extern "C"
2030       void CkSetInLdb(){
2031 #if CMK_MEM_CHECKPOINT
2032         CkMemCheckPT::inLoadbalancing = 1;
2033 #endif
2034       }
2035
2036     extern "C"
2037       int CkInLdb(){
2038 #if CMK_MEM_CHECKPOINT
2039         return CkMemCheckPT::inLoadbalancing;
2040 #endif
2041         return 0;
2042       }
2043
2044     extern "C"
2045       void CkResetInLdb(){
2046 #if CMK_MEM_CHECKPOINT
2047         CkMemCheckPT::inLoadbalancing = 0;
2048 #endif
2049       }
2050
2051     /*****************************************************************************
2052       module initialization
2053      *****************************************************************************/
2054
2055     static int arg_where = CkCheckPoint_inMEM;
2056
2057 #if CMK_MEM_CHECKPOINT
2058     void init_memcheckpt(char **argv)
2059     {
2060       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2061         arg_where = CkCheckPoint_inDISK;
2062       }
2063       CpvInitialize(int, use_checksum);
2064       CpvAccess(use_checksum)=0;
2065       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2066         CpvAccess(use_checksum)=1;
2067       }
2068       // initiliazing _crashedNode variable
2069       CpvInitialize(int, _crashedNode);
2070       CpvInitialize(int, _remoteCrashedNode);
2071       CpvAccess(_crashedNode) = -1;
2072       CpvAccess(_remoteCrashedNode) = -1;
2073       init_FI(argv);
2074     }
2075 #endif
2076
2077     class CkMemCheckPTInit: public Chare {
2078       public:
2079         CkMemCheckPTInit(CkArgMsg *m) {
2080 #if CMK_MEM_CHECKPOINT
2081           if (arg_where == CkCheckPoint_inDISK) {
2082             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2083           }
2084           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2085           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2086 #endif
2087         }
2088     };
2089
2090     static void notifyHandler(char *msg)
2091     {
2092 #if CMK_MEM_CHECKPOINT
2093       CmiFree(msg);
2094       /* immediately increase restart phase to filter old messages */
2095       CpvAccess(_curRestartPhase) ++;
2096       CpvAccess(_qd)->flushStates();
2097       _discard_charm_message();
2098
2099 #endif
2100     }
2101
2102     extern "C"
2103       void notify_crash(int node)
2104       {
2105 #ifdef CMK_MEM_CHECKPOINT
2106         CpvAccess( _crashedNode) = node;
2107 #ifdef CMK_SMP
2108         for(int i=0;i<CkMyNodeSize();i++){
2109           CpvAccessOther(_crashedNode,i)=node;
2110         }
2111 #endif
2112         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2113         CkMemCheckPT::inRestarting = 1;
2114
2115         // this may be in interrupt handler, send a message to reset QD
2116         int pe = CmiNodeFirst(CkMyNode());
2117         for(int i=0;i<CkMyNodeSize();i++){
2118           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2119           CmiSetHandler(msg, notifyHandlerIdx);
2120           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2121         }
2122 #endif
2123       }
2124
2125     extern "C" void (*notify_crash_fn)(int node);
2126
2127
2128 #if CMK_CONVERSE_MPI
2129     void buddyDieHandler(char *msg)
2130     {
2131 #if CMK_MEM_CHECKPOINT
2132       // notify
2133       CkMemCheckPT::inRestarting = 1;
2134       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2135       notify_crash(diepe);
2136       // send message to crash pe to let it restart
2137       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2138       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2139       int buddy = obj->BuddyPE(CmiMyPe());
2140       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2141       if (buddy == diepe)  {
2142         mpi_restart_crashed(diepe, newrank);
2143       }
2144 #endif
2145     }
2146
2147     void pingHandler(void *msg)
2148     {
2149       lastPingTime = CmiWallTimer();
2150       CmiFree(msg);
2151     }
2152
2153     void pingCheckHandler()
2154     {
2155 #if CMK_MEM_CHECKPOINT
2156       double now = CmiWallTimer();
2157       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2158         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2159         int i, pe, buddy;
2160         // tell everyone the buddy dies
2161         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2162         for (i = 1; i < CmiNumPes(); i++) {
2163           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2164           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2165         }
2166         buddy = pe;
2167         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2168         fflush(stdout);
2169         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2170         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2171         CmiSetHandler(msg, buddyDieHandlerIdx);
2172         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2173         //send to everyone in the other world
2174         if(CmiNumPartition()!=1){
2175           //for(int i=0;i<CmiNumPes();i++){
2176             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2177             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2178             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2179             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2180           //}
2181         }
2182       }
2183         else 
2184           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2185 #endif
2186       }
2187
2188       void pingBuddy()
2189       {
2190 #if CMK_MEM_CHECKPOINT
2191         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2192         if (obj) {
2193           int buddy = obj->BuddyPE(CkMyPe());
2194           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2195           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2196           CmiSetHandler(msg, pingHandlerIdx);
2197           CmiGetRestartPhase(msg) = 9999;
2198           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2199         }
2200         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2201 #endif
2202       }
2203 #endif
2204
2205       // initproc
2206       void CkRegisterRestartHandler( )
2207       {
2208 #if CMK_MEM_CHECKPOINT
2209         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2210         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2211         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2212         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2213         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2214
2215         //for replica
2216         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2217         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2218         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2219         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2220         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2221         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2222         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2223         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2224         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2225         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2226
2227 #if CMK_CONVERSE_MPI
2228         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2229         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2230         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2231 #endif
2232
2233         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2234         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2235         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2236         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2237         CpvInitialize(int,curPointer);
2238         CpvInitialize(int,recvdLocal);
2239         CpvInitialize(int,localChkpDone);
2240         CpvInitialize(int,remoteChkpDone);
2241         CpvInitialize(int,remoteStarted);
2242         CpvInitialize(int,localStarted);
2243         CpvInitialize(int,recvdRemote);
2244         CpvInitialize(int,recvdProcChkp);
2245         CpvInitialize(int,localChecksum);
2246         CpvInitialize(int,remoteChecksum);
2247         CpvInitialize(int,recvdArrayChkp);
2248
2249         CpvAccess(procChkptBuf) = NULL;
2250         CpvAccess(buddyBuf) = NULL;
2251         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2252         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2253         CpvAccess(chkpBuf)[0] = NULL;
2254         CpvAccess(chkpBuf)[1] = NULL;
2255         CpvAccess(localProcChkpBuf)[0] = NULL;
2256         CpvAccess(localProcChkpBuf)[1] = NULL;
2257
2258         CpvAccess(curPointer) = 0;
2259         CpvAccess(recvdLocal) = 0;
2260         CpvAccess(localChkpDone) = 0;
2261         CpvAccess(remoteChkpDone) = 0;
2262         CpvAccess(remoteStarted) = 0;
2263         CpvAccess(localStarted) = 0;
2264         CpvAccess(recvdRemote) = 0;
2265         CpvAccess(recvdProcChkp) = 0;
2266         CpvAccess(localChecksum) = 0;
2267         CpvAccess(remoteChecksum) = 0;
2268         CpvAccess(recvdArrayChkp) = 0;
2269
2270         notify_crash_fn = notify_crash;
2271
2272 #if ! CMK_CONVERSE_MPI
2273         // print pid to kill
2274         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2275         //  sleep(4);
2276 #endif
2277 #endif
2278       }
2279
2280
2281       extern "C"
2282         int CkHasCheckpoints()
2283         {
2284           return checkpointed;
2285         }
2286
2287       /// @todo: the following definitions should be moved to a separate file containing
2288       // structures and functions about fault tolerance strategies
2289
2290       /**
2291        *  * @brief: function for killing a process                                             
2292        *   */
2293 #ifdef CMK_MEM_CHECKPOINT
2294 #if CMK_HAS_GETPID
2295       void killLocal(void *_dummy,double curWallTime){
2296         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2297         if(CmiWallTimer()<killTime-1){
2298           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2299         }else{ 
2300 #if CMK_CONVERSE_MPI
2301           CkDieNow();
2302 #else 
2303           kill(getpid(),SIGKILL);                                               
2304 #endif
2305         }              
2306       } 
2307 #else
2308       void killLocal(void *_dummy,double curWallTime){
2309         CmiAbort("kill() not supported!");
2310       }
2311 #endif
2312 #endif
2313
2314 #ifdef CMK_MEM_CHECKPOINT
2315       /**
2316        * @brief: reads the file with the kill information
2317        */
2318       void readKillFile(){
2319         FILE *fp=fopen(killFile,"r");
2320         if(!fp){
2321           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2322           return;
2323         }
2324         int proc;
2325         double sec;
2326         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2327           if(proc == CkMyNode() && CkMyRank() == 0){
2328             killTime = CmiWallTimer()+sec;
2329             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2330             CcdCallFnAfter(killLocal,NULL,sec*1000);
2331           }
2332         }
2333         fclose(fp);
2334       }
2335
2336 #if ! CMK_CONVERSE_MPI
2337       void CkDieNow()
2338       {
2339 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2340         // ignored for non-mpi version
2341         CmiPrintf("[%d] die now.\n", CmiMyPe());
2342         killTime = CmiWallTimer()+0.001;
2343         CcdCallFnAfter(killLocal,NULL,1);
2344 #endif
2345       }
2346 #endif
2347
2348 #endif
2349
2350 #include "CkMemCheckpoint.def.h"
2351
2352