minor
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
440   int elemCount = CkCountChkpSyncElements();
441   if(CkMyPe()==0)
442     startTime = CmiWallTimer();
443   if(elemCount == 0){
444     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
445   }
446 }
447
448 //when one replica failes, decide the next chkp iter
449
450 void CkMemCheckPT::recvIter(int iter){
451   if(maxIter<iter){
452     maxIter=iter;
453   }
454 }
455
456 void CkMemCheckPT::recvMaxIter(int iter){
457   localDecided = false;
458   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
459 }
460
461 void CkMemCheckPT::reachChkpIter(){
462   recvIterCount++;
463   elemCount = CkCountChkpSyncElements();
464   if(recvIterCount == elemCount){
465     recvIterCount = 0;
466     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
467   }
468 }
469
470 void CkMemCheckPT::startChkp(){
471   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
472   CkStartMemCheckpoint(cpCallback);
473 }
474
475 // called by checkpoint mgr to restore an array element
476 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
477 {
478 #if CMK_MEM_CHECKPOINT
479   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
480   // m->index.print();
481   PUP::fromMem p(m->packData);
482   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
483   CmiAssert(mgr);
484 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
485   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
486 #else
487   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
488 #endif
489
490   // find a list of array elements bound together
491   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
492   CmiAssert(elt);
493   CkLocRec_local *rec = elt->myRec;
494   CkVec<CkMigratable *> list;
495   mgr->migratableList(rec, list);
496   CmiAssert(list.length() > 0);
497   for (int l=0; l<list.length(); l++) {
498     elt = (ArrayElement *)list[l];
499     elt->budPEs[0] = m->bud1;
500     elt->budPEs[1] = m->bud2;
501     //    reset, may not needed now
502     // for now.
503     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
504       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
505       if (c) c->redNo = 0;
506     }
507   }
508 #endif
509 }
510
511 // return 1 if pe is a crashed processor
512 int CkMemCheckPT::isFailed(int pe)
513 {
514   for (int i=0; i<failedPes.length(); i++)
515     if (failedPes[i] == pe) return 1;
516   return 0;
517 }
518
519 // add pe into history list of all failed processors
520 void CkMemCheckPT::failed(int pe)
521 {
522   if (isFailed(pe)) return;
523   failedPes.push_back(pe);
524 }
525
526 int CkMemCheckPT::totalFailed()
527 {
528   return failedPes.length();
529 }
530
531 // create an checkpoint entry for array element of aid with index.
532 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
533 {
534   // error check, no duplicate
535   int idx, len = ckTable.size();
536   for (idx=0; idx<len; idx++) {
537     CkCheckPTInfo *entry = ckTable[idx];
538     if (index == entry->index) {
539       if (loc == entry->locMgr) {
540         // bindTo array elements
541         return;
542       }
543       // for array inheritance, the following check may fail
544       // because ArrayElement constructor of all superclasses are called
545       if (aid == entry->aid) {
546         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
547         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
548       }
549     }
550   }
551   CkCheckPTInfo *newEntry;
552   if (where == CkCheckPoint_inMEM)
553     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
554   else
555     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
556   ckTable.push_back(newEntry);
557   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
558 }
559
560 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
561 {
562 #if !CMK_CHKP_ALL       
563   int buddy = msg->bud1;
564   if (buddy == CkMyPe()) buddy = msg->bud2;
565   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
566   recvData(msg);
567   // ack
568   thisProxy[buddy].gotData();
569 #else
570   chkpTable[0] = NULL;
571   chkpTable[1] = NULL;
572   recvArrayCheckpoint(msg);
573   thisProxy[msg->bud2].gotData();
574 #endif
575 }
576
577 // loop through my checkpoint table and ask checkpointed array elements
578 // to send me checkpoint data.
579 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
580 void CkMemCheckPT::doItNow(int starter)
581 {
582   checkpointed = 1;
583   //cpCallback = cb;
584   cpStarter = starter;
585   inCheckpointing = 1;
586   if (CkMyPe() == cpStarter) {
587     startTime = CmiWallTimer();
588     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
589   }
590 #if CMK_CONVERSE_MPI
591   if(CmiNumPartition()==1)
592 #endif    
593   {
594
595 #if !CMK_CHKP_ALL
596     int len = ckTable.length();
597     for (int i=0; i<len; i++) {
598       CkCheckPTInfo *entry = ckTable[i];
599       // always let the bigger number processor send request
600       //if (CkMyPe() < entry->pNo) continue;
601       // always let the smaller number processor send request, may on same proc
602       if (!isMaster(entry->pNo)) continue;
603       // call inmem_checkpoint to the array element, ask it to send
604       // back checkpoint data via recvData().
605       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
606       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
607     }
608     // if my table is empty, then I am done
609     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
610 #else
611     startArrayCheckpoint();
612 #endif
613     sendProcData();
614   }
615 #if CMK_CONVERSE_MPI
616   else
617   {
618     startCheckpoint();
619   }
620 #endif
621   // pack and send proc level data
622 }
623
624 class MemElementPacker : public CkLocIterator{
625   private:
626     //    CkLocMgr *locMgr;
627     PUP::er &p;
628     std::map<CkHashCode,CkLocation> arrayMap;
629   public:
630     MemElementPacker(PUP::er &p_):p(p_){};
631     void addLocation(CkLocation &loc){
632       CkArrayIndexMax idx = loc.getIndex();
633       arrayMap[idx.hash()] = loc;
634     }
635     void writeCheckpoint(){
636       std::map<CkHashCode, CkLocation>::iterator it;
637       for(it = arrayMap.begin();it!=arrayMap.end();it++){
638         CkLocation loc = it->second;
639         CkLocMgr *locMgr = loc.getManager();
640         CkArrayIndexMax idx = loc.getIndex();
641         CkGroupID gID = locMgr->ckGetGroupID();
642         p|gID;
643         p|idx;
644         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
645       }
646     }
647 };
648
649 void pupAllElements(PUP::er &p){
650 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
651   int numElements;
652   if(!p.isUnpacking()){
653     numElements = CkCountArrayElements();
654   }
655   p | numElements;
656   if(!p.isUnpacking()){
657     MemElementPacker packer(p);
658     CKLOCMGR_LOOP(mgr->iterate(packer););
659     packer.writeCheckpoint();
660   }
661 #endif
662 }
663
664 void CkMemCheckPT::startArrayCheckpoint(){
665 #if CMK_CHKP_ALL
666   int size;
667   {
668     PUP::sizer psizer;
669     pupAllElements(psizer);
670     size = psizer.size();
671   }
672   int packSize = size/sizeof(double)+1;
673   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
674   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
675   msg->len = size;
676   msg->cp_flag = 1;
677   int budPEs[2];
678   msg->bud1=CkMyPe();
679   msg->bud2=ChkptOnPe(CkMyPe());
680   {
681     PUP::toMem p(msg->packData);
682     pupAllElements(p);
683   }
684   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
685   if(chkpTable[0]) delete chkpTable[0];
686   chkpTable[0] = msg;
687   //send the checkpoint to my 
688   recvCount++;
689 #endif
690 }
691
692 void CkMemCheckPT::startCheckpoint(){
693 #if CMK_CONVERSE_MPI
694   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
695     CkPrintf("in start checkpointing!!!!\n");
696   int size;
697   {
698     PUP::sizer p;
699     _handleProcData(p,CmiFalse);
700     size = p.size();
701   }
702   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
703   procMsg->len = size;
704   procMsg->cp_flag = 1;
705   {
706     PUP::toMem p(procMsg->packData);
707     _handleProcData(p,CmiFalse);
708   }
709   int pointer = CpvAccess(curPointer);
710   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
711   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
712
713   {
714     PUP::sizer psizer;
715     pupAllElements(psizer);
716     size = psizer.size();
717   }
718   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
719   msg->len = size;
720   msg->cp_flag = 1;
721   int checksum;
722   {
723     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
724       PUP::checker p(msg->packData);
725       pupAllElements(p);
726       checksum = p.getChecksum();
727     }else{  
728 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
729       PUP::toMem p(msg->packData);
730       pupAllElements(p);
731     }
732   }
733   pointer = CpvAccess(curPointer);
734   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
735   CpvAccess(chkpBuf)[pointer] = msg;
736   if(CkMyPe()==0)
737     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
738   if(CkReplicaAlive()==1){
739     CpvAccess(recvdLocal) = 1;
740     if(CpvAccess(use_checksum)){
741       CpvAccess(localChecksum) = checksum;
742       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
743       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
744       //only one reaplica will send
745       if(CmiMyPartition()==0){
746         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
747         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
748       }
749     }else{
750       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
751       CkPackMessage(&env);
752       if(CmiMyPartition()==0){
753         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
754         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
755       }
756     }
757     if(CmiMyPartition()==0){
758       notifyReplica = 1;
759       thisProxy[CkMyPe()].doneComparison(true);
760     }
761   }
762   if(CpvAccess(recvdRemote)==1){
763     //only partition 1 will do it
764     //compare the checkpoint 
765     int size = CpvAccess(chkpBuf)[pointer]->len;
766     if(CpvAccess(use_checksum)){
767       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
768         thisProxy[CkMyPe()].doneComparison(true);
769       }
770       else{
771         thisProxy[CkMyPe()].doneComparison(false);
772       }
773     }else{
774       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
775         thisProxy[CkMyPe()].doneComparison(true);
776       }
777       else{
778         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
779         thisProxy[CkMyPe()].doneComparison(false);
780       }
781     }
782     if(CkMyPe()==0)
783       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
784   }
785   else{
786     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
787       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
788       if(CpvAccess(remoteReady)==1){
789         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
790         {       
791           int pointer = CpvAccess(curPointer);
792           //send the proc data
793           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
794           procMsg->pointer = pointer;
795           envelope * env = (envelope *)(UsrToEnv(procMsg));
796           CkPackMessage(&env);
797           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
798           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
799           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
800             CkPrintf("[%d] sendProcdata\n",CkMyPe());
801           }
802         }
803         //send the array checkpoint data
804         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
805         msg->pointer = CpvAccess(curPointer);
806         envelope * env = (envelope *)(UsrToEnv(msg));
807         CkPackMessage(&env);
808         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
809         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
810         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
811           CkPrintf("[%d] sendArraydata\n",CkMyPe());
812       }else{
813         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
814           int pointer = CpvAccess(curPointer);
815           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
816           (CpvAccess(recoverProcBuf))->pointer = pointer;
817         }
818         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
819         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
820         CpvAccess(localReady) = 1;
821       }
822       //can continue work, no need to wait for my replica
823       int _ret = 1;
824       notifyReplica = 1;
825       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
826       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
827     }
828   }
829 #endif
830 }
831
832 void CkMemCheckPT::doneComparison(bool ret){
833   int _ret = 1;
834   if(!ret){
835     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
836     _ret = 0;
837   }else{
838     _ret = 1;
839   }
840   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
841   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
842 }
843
844 void CkMemCheckPT::doneRComparison(int ret){
845   //    if(CpvAccess(curPointer) == 0){
846   //if(ret==CkNumPes()){
847     CpvAccess(localChkpDone) = 1;
848     if(CpvAccess(remoteChkpDone) ==1){
849       thisProxy.doneBothComparison();
850     }else{
851       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
852     }
853     if(notifyReplica == 0){
854       //notify the replica am done
855       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
856       *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
857       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
858       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
859       notifyReplica = 1;
860     }
861  // }
862  /* else{
863     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
864     startTime = CmiWallTimer();
865     thisProxy.RollBack();
866   }*/
867 }
868
869 void CkMemCheckPT::doneBothComparison(){
870   CpvAccess(recvdRemote) = 0;
871   CpvAccess(remoteReady) = 0;
872   CpvAccess(localReady) = 0;
873   CpvAccess(recvdLocal) = 0;
874   CpvAccess(localChkpDone) = 0;
875   CpvAccess(remoteChkpDone) = 0;
876   CpvAccess(remoteStarted) = 0;
877   CpvAccess(localStarted) = 0;
878   CpvAccess(_remoteCrashedNode) = -1;
879   CkMemCheckPT::replicaAlive = 1;
880   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
881   CpvAccess(curPointer)^=1;
882   inCheckpointing = 0;
883   notifyReplica = 0;
884   if(CkMyPe() == 0){
885     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
886   }
887   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
888 }
889
890 void CkMemCheckPT::RollBack(){
891   //restore group data
892   checkpointed = 0;
893   CkMemCheckPT::inRestarting = 1;
894   int pointer = CpvAccess(curPointer)^1;//use the previous one
895   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
896   PUP::fromMem p(chkpMsg->packData);    
897
898   //destroy array elements
899   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
900   int numGroups = CkpvAccess(_groupIDTable)->size();
901   for(int i=0;i<numGroups;i++) {
902     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
903     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
904     obj->flushStates();
905     obj->ckJustMigrated();
906   }
907   //restore array elements
908
909   int numElements;
910   p|numElements;
911
912   if(p.isUnpacking()){
913     for(int i=0;i<numElements;i++){
914       CkGroupID gID;
915       CkArrayIndex idx;
916       p|gID;
917       p|idx;
918       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
919       CmiAssert(mgr);
920       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
921     }
922     }
923     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
924     contribute(cb);
925   }
926
927   void CkMemCheckPT::notifyReplicaDie(int pe){
928     replicaAlive = 0;
929     CpvAccess(_remoteCrashedNode) = pe;
930   }
931
932   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
933   {
934 #if CMK_CHKP_ALL
935     int idx = 1;
936     if(msg->bud1 == CkMyPe()){
937       idx = 0;
938     }
939     int isChkpting = msg->cp_flag;
940     if(isChkpting == 1){
941       if(chkpTable[idx]) delete chkpTable[idx];
942     }
943     chkpTable[idx] = msg;
944     if(isChkpting){
945       recvCount++;
946       if(recvCount == 2){
947         if (where == CkCheckPoint_inMEM) {
948           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
949         }
950         else if (where == CkCheckPoint_inDISK) {
951           // another barrier for finalize the writing using fsync
952           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
953           contribute(0,NULL,CkReduction::sum_int,localcb);
954         }
955         else
956           CmiAbort("Unknown checkpoint scheme");
957         recvCount = 0;
958       }
959     }
960 #endif
961   }
962
963   // don't handle array elements
964   static inline void _handleProcData(PUP::er &p, CmiBool create)
965   {
966     // save readonlys, and callback BTW
967     CkPupROData(p);
968
969     // save mainchares 
970     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
971
972 #ifndef CMK_CHARE_USE_PTR
973     // save non-migratable chare
974     CkPupChareData(p);
975 #endif
976
977     // save groups into Groups.dat
978     CkPupGroupData(p,create);
979
980     // save nodegroups into NodeGroups.dat
981     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
982   }
983
984   void CkMemCheckPT::sendProcData()
985   {
986     // find out size of buffer
987     int size;
988     {
989       PUP::sizer p;
990       _handleProcData(p,CmiTrue);
991       size = p.size();
992     }
993     int packSize = size;
994     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
995     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
996     {
997       PUP::toMem p(msg->packData);
998       _handleProcData(p,CmiTrue);
999     }
1000     msg->pe = CkMyPe();
1001     msg->len = size;
1002     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1003     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1004   }
1005
1006   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1007   {
1008     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1009     CpvAccess(procChkptBuf) = msg;
1010     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1011     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1012   }
1013
1014   // ArrayElement call this function to give us the checkpointed data
1015   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1016   {
1017     int len = ckTable.length();
1018     int idx;
1019     for (idx=0; idx<len; idx++) {
1020       CkCheckPTInfo *entry = ckTable[idx];
1021       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1022     }
1023     CkAssert(idx < len);
1024     int isChkpting = msg->cp_flag;
1025     ckTable[idx]->updateBuffer(msg);
1026     if (isChkpting) {
1027       // all my array elements have returned their inmem data
1028       // inform starter processor that I am done.
1029       recvCount ++;
1030       if (recvCount == ckTable.length()) {
1031         if (where == CkCheckPoint_inMEM) {
1032           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1033         }
1034         else if (where == CkCheckPoint_inDISK) {
1035           // another barrier for finalize the writing using fsync
1036           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1037           contribute(0,NULL,CkReduction::sum_int,localcb);
1038         }
1039         else
1040           CmiAbort("Unknown checkpoint scheme");
1041         recvCount = 0;
1042       } 
1043     }
1044   }
1045
1046   // only used in disk checkpointing
1047   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1048   {
1049     delete m;
1050 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1051     system("sync");
1052 #endif
1053     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1054   }
1055
1056   // only is called on cpStarter when checkpoint is done
1057   void CkMemCheckPT::cpFinish()
1058   {
1059     CmiAssert(CkMyPe() == cpStarter);
1060     peCount++;
1061     // now that all processors have finished, activate callback
1062     if (peCount == 2) 
1063     {
1064       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1065       peCount = 0;
1066       thisProxy.report();
1067     }
1068   }
1069
1070   // for debugging, report checkpoint info
1071   void CkMemCheckPT::report()
1072   {
1073     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1074     inCheckpointing = 0;
1075 #if !CMK_CHKP_ALL
1076     int objsize = 0;
1077     int len = ckTable.length();
1078     for (int i=0; i<len; i++) {
1079       CkCheckPTInfo *entry = ckTable[i];
1080       CmiAssert(entry);
1081       objsize += entry->getSize();
1082     }
1083     CmiAssert(CpvAccess(procChkptBuf));
1084     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1085 #else
1086     if(CkMyPe()==0)
1087       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1088 #endif
1089   }
1090
1091   /*****************************************************************************
1092     RESTART Procedure
1093    *****************************************************************************/
1094
1095   // master processor of two buddies
1096   inline int CkMemCheckPT::isMaster(int buddype)
1097   {
1098 #if 0
1099     int mype = CkMyPe();
1100     //CkPrintf("ismaster: %d %d\n", pe, mype);
1101     if (CkNumPes() - totalFailed() == 2) {
1102       return mype > buddype;
1103     }
1104     for (int i=1; i<CkNumPes(); i++) {
1105       int me = (buddype+i)%CkNumPes();
1106       if (isFailed(me)) continue;
1107       if (me == mype) return 1;
1108       else return 0;
1109     }
1110     return 0;
1111 #else
1112     // smaller one
1113     int mype = CkMyPe();
1114     //CkPrintf("ismaster: %d %d\n", pe, mype);
1115     if (CkNumPes() - totalFailed() == 2) {
1116       return mype < buddype;
1117     }
1118 #if NODE_CHECKPOINT
1119     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1120     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1121 #else
1122       for (int i=1; i<CkNumPes(); i++) {
1123 #endif
1124         int me = (mype+i)%CkNumPes();
1125         if (isFailed(me)) continue;
1126         if (me == buddype) return 1;
1127         else return 0;
1128       }
1129       return 0;
1130 #endif
1131     }
1132
1133
1134
1135 #if 0
1136     // helper class to pup all elements that belong to same ckLocMgr
1137     class ElementDestoryer : public CkLocIterator {
1138       private:
1139         CkLocMgr *locMgr;
1140       public:
1141         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1142         void addLocation(CkLocation &loc) {
1143           CkArrayIndex idx=loc.getIndex();
1144           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1145           loc.destroy();
1146         }
1147     };
1148 #endif
1149
1150     // restore the bitmap vector for LB
1151     void CkMemCheckPT::resetLB(int diepe)
1152     {
1153 #if CMK_LBDB_ON
1154       int i;
1155       char *bitmap = new char[CkNumPes()];
1156       // set processor available bitmap
1157       get_avail_vector(bitmap);
1158       for (i=0; i<failedPes.length(); i++)
1159         bitmap[failedPes[i]] = 0; 
1160       bitmap[diepe] = 0;
1161
1162 #if CK_NO_PROC_POOL
1163       set_avail_vector(bitmap);
1164 #endif
1165
1166       // if I am the crashed pe, rebuild my failedPEs array
1167       if (CkMyNode() == diepe)
1168         for (i=0; i<CkNumPes(); i++) 
1169           if (bitmap[i]==0) failed(i);
1170
1171       delete [] bitmap;
1172 #endif
1173     }
1174
1175     static double restartT;
1176
1177     // in case when failedPe dies, everybody go through its checkpoint table:
1178     // destory all array elements
1179     // recover lost buddies
1180     // reconstruct all array elements from check point data
1181     // called on all processors
1182     void CkMemCheckPT::restart(int diePe)
1183     {
1184 #if CMK_MEM_CHECKPOINT
1185       double curTime = CmiWallTimer();
1186       if (CkMyPe() == diePe){
1187         restartT = CmiWallTimer();
1188         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1189       }
1190       stage = (char*)"resetLB";
1191       startTime = curTime;
1192       if (CkMyPe() == diePe)
1193         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1194
1195 #if CK_NO_PROC_POOL
1196       failed(diePe);    // add into the list of failed pes
1197 #endif
1198       thisFailedPe = diePe;
1199
1200       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1201
1202       inRestarting = 1;
1203
1204       // disable load balancer's barrier
1205       //if (CkMyPe() != diePe) resetLB(diePe);
1206
1207       CKLOCMGR_LOOP(mgr->startInserting(););
1208
1209
1210       if(CmiNumPartition()==1){
1211         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1212       }else{
1213         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1214         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1215       }
1216 #endif
1217     }
1218
1219     // loally remove all array elements
1220     void CkMemCheckPT::removeArrayElements()
1221     {
1222 #if CMK_MEM_CHECKPOINT
1223       int len = ckTable.length();
1224       double curTime = CmiWallTimer();
1225       if (CkMyPe() == thisFailedPe) 
1226         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1227       stage = (char*)"removeArrayElements";
1228       startTime = curTime;
1229
1230       //  if (cpCallback.isInvalid()) 
1231       //          CkPrintf("invalid pe %d\n",CkMyPe());
1232       //          CkAbort("Didn't set restart callback\n");;
1233       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1234
1235       // get rid of all buffering and remote recs
1236       // including destorying all array elements
1237 #if CK_NO_PROC_POOL  
1238       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1239 #else
1240       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1241 #endif
1242       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1243 #endif
1244     }
1245
1246     // flush state in reduction manager
1247     void CkMemCheckPT::resetReductionMgr()
1248     {
1249       if (CkMyPe() == thisFailedPe) 
1250         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1251       int numGroups = CkpvAccess(_groupIDTable)->size();
1252       for(int i=0;i<numGroups;i++) {
1253         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1254         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1255         obj->flushStates();
1256         obj->ckJustMigrated();
1257       }
1258       // reset again
1259       //CpvAccess(_qd)->flushStates();
1260       if(CmiNumPartition()==1){
1261         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1262       }
1263       else
1264         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1265     }
1266
1267     // recover the lost buddies
1268     void CkMemCheckPT::recoverBuddies()
1269     {
1270       int idx;
1271       int len = ckTable.length();
1272       // ready to flush reduction manager
1273       // cannot be CkMemCheckPT::restart because destory will modify states
1274       double curTime = CmiWallTimer();
1275       if (CkMyPe() == thisFailedPe)
1276         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1277       stage = (char *)"recoverBuddies";
1278       if (CkMyPe() == thisFailedPe)
1279         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1280       startTime = curTime;
1281
1282       // recover buddies
1283       expectCount = 0;
1284 #if !CMK_CHKP_ALL
1285       for (idx=0; idx<len; idx++) {
1286         CkCheckPTInfo *entry = ckTable[idx];
1287         if (entry->pNo == thisFailedPe) {
1288 #if CK_NO_PROC_POOL
1289           // find a new buddy
1290           int budPe = BuddyPE(CkMyPe());
1291 #else
1292           int budPe = thisFailedPe;
1293 #endif
1294           CkArrayCheckPTMessage *msg = entry->getCopy();
1295           msg->bud1 = budPe;
1296           msg->bud2 = CkMyPe();
1297           msg->cp_flag = 0;            // not checkpointing
1298           thisProxy[budPe].recoverEntry(msg);
1299           expectCount ++;
1300         }
1301       }
1302 #else
1303       //send to failed pe
1304       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1305 #if CK_NO_PROC_POOL
1306         // find a new buddy
1307         int budPe = BuddyPE(CkMyPe());
1308 #else
1309         int budPe = thisFailedPe;
1310 #endif
1311         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1312         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1313         msg->cp_flag = 0;            // not checkpointing
1314         msg->bud1 = budPe;
1315         msg->bud2 = CkMyPe();
1316         thisProxy[budPe].recoverEntry(msg);
1317         expectCount ++;
1318       }
1319 #endif
1320
1321       if (expectCount == 0) {
1322         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1323       }
1324       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1325     }
1326
1327     void CkMemCheckPT::gotData()
1328     {
1329       ackCount ++;
1330       if (ackCount == expectCount) {
1331         ackCount = 0;
1332         expectCount = -1;
1333         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1334         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1335       }
1336     }
1337
1338     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1339     {
1340
1341       for (int i=0; i<n; i++) {
1342         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1343         mgr->updateLocation(idx[i], nowOnPe);
1344       }
1345       thisProxy[nowOnPe].gotReply();
1346     }
1347
1348     // restore array elements
1349     void CkMemCheckPT::recoverArrayElements()
1350     {
1351       double curTime = CmiWallTimer();
1352       int len = ckTable.length();
1353       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1354       stage = (char *)"recoverArrayElements";
1355       if (CkMyPe() == thisFailedPe)
1356         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1357       startTime = curTime;
1358       int flag = 0;
1359       // recover all array elements
1360       int count = 0;
1361
1362 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1363       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1364       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1365 #endif
1366
1367 #if !CMK_CHKP_ALL
1368       for (int idx=0; idx<len; idx++)
1369       {
1370         CkCheckPTInfo *entry = ckTable[idx];
1371 #if CK_NO_PROC_POOL
1372         // the bigger one will do 
1373         //    if (CkMyPe() < entry->pNo) continue;
1374         if (!isMaster(entry->pNo)) continue;
1375 #else
1376         // smaller one do it, which has the original object
1377         if (CkMyPe() == entry->pNo+1 || 
1378             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1379 #endif
1380         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1381
1382         entry->updateBuddy(CkMyPe(), entry->pNo);
1383         CkArrayCheckPTMessage *msg = entry->getCopy();
1384         // gzheng
1385         //thisProxy[CkMyPe()].inmem_restore(msg);
1386         inmem_restore(msg);
1387 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1388         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1389         int homePe = mgr->homePe(msg->index);
1390         if (homePe != CkMyPe()) {
1391           gmap[homePe].push_back(msg->locMgr);
1392           imap[homePe].push_back(msg->index);
1393         }
1394 #endif
1395         CkFreeMsg(msg);
1396         count ++;
1397       }
1398 #else
1399       char * packData;
1400       if(CmiNumPartition()==1){
1401         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1402         packData = (char *)msg->packData;
1403       }
1404       else{
1405         int pointer = CpvAccess(curPointer);
1406         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1407         packData = msg->packData;
1408       }
1409 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1410       recoverAll(packData,gmap,imap);
1411 #else
1412       recoverAll(packData);
1413 #endif
1414 #endif
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       for (int i=0; i<CkNumPes(); i++) {
1417         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1418           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1419           flag++;       
1420         }
1421       }
1422       delete [] imap;
1423       delete [] gmap;
1424 #endif
1425       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1426
1427       CKLOCMGR_LOOP(mgr->doneInserting(););
1428
1429       // _crashedNode = -1;
1430       CpvAccess(_crashedNode) = -1;
1431 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1432       if (CkMyPe() == 0)
1433         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1434 #else
1435       if(flag == 0)
1436       {
1437         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1438       }
1439 #endif
1440     }
1441
1442     void CkMemCheckPT::gotReply(){
1443       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1444     }
1445
1446     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1447 #if CMK_CHKP_ALL
1448       PUP::fromMem p(packData);
1449       int numElements;
1450       p|numElements;
1451       if(p.isUnpacking()){
1452         for(int i=0;i<numElements;i++){
1453           CkGroupID gID;
1454           CkArrayIndex idx;
1455           p|gID;
1456           p|idx;
1457           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1458           int homePe = mgr->homePe(idx);
1459 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1460           mgr->resume(idx,p,CmiTrue,CmiTrue);
1461 #else
1462           if(CmiNumPartition()==1)
1463             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1464           else{
1465             if(CkMyPe()==thisFailedPe){
1466               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1467             }
1468             else{
1469               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1470             }
1471           }
1472 #endif
1473 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1474           homePe = mgr->homePe(idx);
1475           if (homePe != CkMyPe()) {
1476             gmap[homePe].push_back(gID);
1477             imap[homePe].push_back(idx);
1478           }
1479 #endif
1480         }
1481       }
1482 #endif
1483     }
1484
1485
1486     // on every processor
1487     // turn load balancer back on
1488     void CkMemCheckPT::finishUp()
1489     {
1490       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1491       //CKLOCMGR_LOOP(mgr->doneInserting(););
1492
1493       if (CkMyPe() == thisFailedPe)
1494       {
1495         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1496         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1497         fflush(stdout);
1498       }
1499       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1500       inRestarting = 0;
1501
1502 #if CMK_CONVERSE_MPI    
1503       if(CmiNumPartition()!=1){
1504         CpvAccess(recvdProcChkp) = 0;
1505         CpvAccess(recvdArrayChkp) = 0;
1506         CpvAccess(curPointer)^=1;
1507         //notify my replica, restart is done
1508         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1509           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1510           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1511           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1512         }
1513       }
1514       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1515         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1516       }
1517 #endif
1518
1519 #if CK_NO_PROC_POOL
1520 #if NODE_CHECKPOINT
1521       int numnodes = CmiNumPhysicalNodes();
1522 #else
1523       int numnodes = CkNumPes();
1524 #endif
1525       if (numnodes-totalFailed() <=2) {
1526         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1527         _memChkptOn = 0;
1528       }
1529 #endif
1530     }
1531
1532     void CkMemCheckPT::recoverFromSoftFailure()
1533     {
1534       inRestarting = 0;
1535       CpvAccess(recvdRemote) = 0;
1536       CpvAccess(recvdLocal) = 0;
1537       CpvAccess(localChkpDone) = 0;
1538       CpvAccess(remoteChkpDone) = 0;
1539       inCheckpointing = 0;
1540       notifyReplica = 0;
1541       if(CkMyPe() == 0){
1542         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1543       }
1544       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1545     }
1546     // called only on 0
1547     void CkMemCheckPT::quiescence(CkCallback &cb)
1548     {
1549       static int pe_count = 0;
1550       pe_count ++;
1551       CmiAssert(CkMyPe() == 0);
1552       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1553       if (pe_count == CkNumPes()) {
1554         pe_count = 0;
1555         cb.send();
1556       }
1557     }
1558
1559     // User callable function - to start a checkpoint
1560     // callback cb is used to pass control back
1561     void CkStartMemCheckpoint(CkCallback &cb)
1562     {
1563 #if CMK_MEM_CHECKPOINT
1564       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1565       /*if (_memChkptOn == 0) {
1566         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1567         cb.send();
1568         return;
1569         }*/
1570       if (CkInRestarting()) {
1571         // trying to checkpointing during restart
1572         cb.send();
1573         return;
1574       }
1575       // store user callback and user data
1576       CkMemCheckPT::cpCallback = cb;
1577
1578
1579       //send to my replica that checkpoint begins 
1580       if(CkReplicaAlive()==1){
1581         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1582         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1583         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1584       }
1585       CpvAccess(localStarted) = 1;
1586       // broadcast to start check pointing
1587       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1588         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1589         checkptMgr.doItNow(CkMyPe());
1590       }
1591 #else
1592       // when mem checkpoint is disabled, invike cb immediately
1593       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1594       cb.send();
1595 #endif
1596     }
1597
1598     void CkRestartCheckPoint(int diePe)
1599     {
1600       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1601       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1602       // broadcast
1603       checkptMgr.restart(diePe);
1604     }
1605
1606     static int _diePE = -1;
1607
1608     // callback function used locally by ccs handler
1609     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1610     {
1611       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1612       CkRestartCheckPoint(_diePE);
1613     }
1614
1615
1616     // called on crashed PE
1617     static void restartBeginHandler(char *msg)
1618     {
1619       CmiFree(msg);
1620 #if CMK_MEM_CHECKPOINT
1621 #if CMK_USE_BARRIER
1622       if(CkMyPe()!=_diePE){
1623         printf("restar begin on %d\n",CkMyPe());
1624         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1625         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1626         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1627       }else{
1628         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1629         CkRestartCheckPointCallback(NULL, NULL);
1630       }
1631 #else
1632       static int count = 0;
1633       CmiAssert(CkMyPe() == _diePE);
1634       count ++;
1635       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1636         printf("restart begin on %d\n",CkMyPe());
1637         CkRestartCheckPointCallback(NULL, NULL);
1638         count = 0;
1639       }
1640 #endif
1641 #endif
1642     }
1643
1644     extern void _discard_charm_message();
1645     extern void _resume_charm_message();
1646
1647     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1648       return data;
1649     }
1650
1651     static void restartBcastHandler(char *msg)
1652     {
1653 #if CMK_MEM_CHECKPOINT
1654       // advance phase counter
1655       CkMemCheckPT::inRestarting = 1;
1656       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1657       // gzheng
1658       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1659
1660       if (CkMyPe()==_diePE)
1661         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1662
1663       // reset QD counters
1664       /*  gzheng
1665           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1666        */
1667
1668       /*  gzheng
1669           if (CkMyPe()==_diePE)
1670           CkRestartCheckPointCallback(NULL, NULL);
1671        */
1672       CmiFree(msg);
1673
1674       _resume_charm_message();
1675
1676       // reduction
1677       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1678       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1679 #if CMK_USE_BARRIER
1680       //CmiPrintf("before reduce\n");   
1681       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1682       //CmiPrintf("after reduce\n");    
1683 #else
1684       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1685 #endif 
1686       checkpointed = 0;
1687 #endif
1688     }
1689
1690     extern void _initDone();
1691
1692     bool compare(char * buf1, char *buf2){
1693       if(CkMyPe()==0)
1694         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1695       PUP::checker pchecker(buf1,buf2);
1696       pchecker.skip();
1697       int numElements;
1698       pchecker|numElements;
1699       for(int i=0;i<numElements;i++){
1700         CkGroupID gID;
1701         CkArrayIndex idx;
1702
1703         pchecker|gID;
1704         pchecker|idx;
1705
1706         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1707         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1708       }
1709       if(CkMyPe()==0)
1710         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1711       //int fault_num = pchecker.getFaultNum();
1712       bool result = pchecker.getResult();
1713       /*if(!result){
1714         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1715       }*/
1716       return result;
1717     }
1718     int getChecksum(char * buf){
1719       PUP::checker pchecker(buf);
1720       pchecker.skip();
1721       int numElements;
1722       pchecker|numElements;
1723 //      CkPrintf("num %d\n",numElements);
1724       for(int i=0;i<numElements;i++){
1725         CkGroupID gID;
1726         CkArrayIndex idx;
1727
1728         pchecker|gID;
1729         pchecker|idx;
1730
1731         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1732         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1733       }
1734       return pchecker.getChecksum();
1735     }
1736
1737     static void recvRemoteChkpHandler(char *msg){
1738       CpvAccess(remoteChkpDone) = 1;
1739       if(CpvAccess(use_checksum)){
1740         if(CkMyPe()==0)
1741           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1742         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1743         CpvAccess(recvdRemote) = 1;
1744         if(CpvAccess(recvdLocal)==1){
1745           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1746             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1747           }
1748           else{
1749             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1750           }
1751         }
1752       }else{
1753         envelope *env = (envelope *)msg;
1754         CkUnpackMessage(&env);
1755         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1756         if(CpvAccess(recvdLocal)==1){
1757           int pointer = CpvAccess(curPointer);
1758           int size = CpvAccess(chkpBuf)[pointer]->len;
1759           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1760             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1761           }else
1762           {
1763             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1764           }
1765           delete chkpMsg;
1766           if(CkMyPe()==0)
1767             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1768         }else{
1769           CpvAccess(recvdRemote) = 1;
1770           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1771           CpvAccess(buddyBuf) = chkpMsg;
1772         }
1773       }
1774     }
1775
1776     static void replicaRecoverHandler(char *msg){
1777       //fflush(stdout);
1778       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1779       CpvAccess(remoteChkpDone) = 1;
1780       if(CpvAccess(localChkpDone) == 1)
1781         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1782       CmiFree(msg);
1783     }
1784
1785     static void replicaChkpDoneHandler(char *msg){
1786       CpvAccess(remoteChkpDone) = 1;
1787       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1788       if(CpvAccess(localChkpDone) == 1)
1789         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1790       CmiFree(msg);
1791     }
1792
1793     static void replicaDieHandler(char * msg){
1794 #if CMK_CONVERSE_MPI
1795       //broadcast to every one in my replica
1796       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1797       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1798 #endif
1799     }
1800
1801     static void replicaChkpStartHandler(char * msg){
1802       CpvAccess(remoteStarted) =1;
1803       if(CpvAccess(localStarted)==1){    
1804         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1805         checkptMgr.doItNow(0);
1806       }
1807     }
1808
1809
1810     static void replicaDieBcastHandler(char *msg){
1811       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1812       CpvAccess(_remoteCrashedNode) = diePe;
1813       if(CkMyPe()==diePe){
1814         CmiPrintf("pe %d in replicad word die\n",diePe);
1815         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1816         fflush(stdout);
1817       }
1818       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1819         find_spare_mpirank(diePe,CmiMyPartition()^1);
1820       }
1821       //broadcast to my partition to get local max iter
1822       if(CpvAccess(resilience)!=1){
1823         CkMemCheckPT::replicaAlive = 0;
1824         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1825       }
1826       CmiFree(msg);
1827     }
1828
1829     static void recoverRemoteProcDataHandler(char *msg){
1830       envelope *env = (envelope *)msg;
1831       CkUnpackMessage(&env);
1832       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1833
1834       //store the checkpoint
1835       int pointer = procMsg->pointer;
1836
1837
1838       if(CkMyPe()==CpvAccess(_crashedNode)){
1839         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1840         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1841         PUP::fromMem p(procMsg->packData);
1842         _handleProcData(p,CmiTrue);
1843         _initDone();
1844       }
1845       else{
1846         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1847         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1848         //_handleProcData(p,CmiFalse);
1849       }
1850       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1851       CKLOCMGR_LOOP(mgr->startInserting(););
1852
1853       CpvAccess(recvdProcChkp) =1;
1854       if(CpvAccess(recvdArrayChkp)==1){
1855         _resume_charm_message();
1856         _diePE = CpvAccess(_crashedNode);
1857         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1858         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1859         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1860       }
1861     }
1862
1863     static void recoverRemoteArrayDataHandler(char *msg){
1864       envelope *env = (envelope *)msg;
1865       CkUnpackMessage(&env);
1866       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1867
1868       //store the checkpoint
1869       int pointer = chkpMsg->pointer;
1870       CpvAccess(curPointer) = pointer;
1871       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1872       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1873       CpvAccess(recvdArrayChkp) =1;
1874       CkMemCheckPT::inRestarting = 1;
1875       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1876         _resume_charm_message();
1877         _diePE = CpvAccess(_crashedNode);
1878         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1879         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1880         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1881         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1882       }
1883     }
1884
1885     static void recvPhaseHandler(char * msg)
1886     {
1887       CpvAccess(_curRestartPhase)--;
1888       CkMemCheckPT::inRestarting = 1;
1889       CmiFree(msg);
1890       //notify the buddy in the replica now i can receive the checkpoint msg
1891       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
1892         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1893         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1894         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1895         //timer start
1896         if(CpvAccess(_crashedNode)==CkMyPe())
1897           CkMemCheckPT::startTime = restartT = CmiWallTimer();
1898         if(CpvAccess(_crashedNode)==CmiMyPe())
1899           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1900       }else{
1901         CpvAccess(curPointer)^=1;
1902         CkMemCheckPT::inRestarting = 1;
1903         _resume_charm_message();
1904         _diePE = CpvAccess(_crashedNode);
1905       }
1906     }
1907     
1908     static void askRecoverDataHandler(char * msg){
1909       if(CpvAccess(resilience)!=1){
1910         CpvAccess(remoteReady)=1;
1911         if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1912           CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1913         if(CpvAccess(localReady)==1){
1914           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1915           {     
1916             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
1917             CkPackMessage(&env);
1918             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1919             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1920             CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
1921           }
1922           //send the array checkpoint data
1923           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1924           CkPackMessage(&env);
1925           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1926           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1927           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1928             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1929         }
1930       }else{
1931         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
1932         int pointer = CpvAccess(curPointer)^1;
1933         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
1934         procMsg->pointer = pointer;
1935         envelope * env = (envelope *)(UsrToEnv(procMsg));
1936         CkPackMessage(&env);
1937         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1938         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1939         CmiPrintf("[%d] sendProcdata after request\n",CmiMyPe());
1940         
1941         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1942         arrayMsg->pointer = pointer;
1943         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
1944         CkPackMessage(&env1);
1945         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
1946         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
1947         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
1948       }
1949     }
1950     // called on crashed processor
1951     static void recoverProcDataHandler(char *msg)
1952     {
1953 #if CMK_MEM_CHECKPOINT
1954       int i;
1955       envelope *env = (envelope *)msg;
1956       CkUnpackMessage(&env);
1957       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1958       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1959       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1960       PUP::fromMem p(procMsg->packData);
1961       _handleProcData(p,CmiTrue);
1962
1963       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1964       // gzheng
1965       CKLOCMGR_LOOP(mgr->startInserting(););
1966
1967       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1968       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1969       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1970       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1971
1972       _initDone();
1973       //   CpvAccess(_qd)->flushStates();
1974       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1975 #endif
1976     }
1977     //for replica, got the phase number from my neighbor processor in the current partition
1978     static void askPhaseHandler(char *msg)
1979     {
1980 #if CMK_MEM_CHECKPOINT
1981       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1982       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1983       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1984       CmiSetHandler(msg, recvPhaseHandlerIdx);
1985       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1986 #endif
1987     }
1988     // called on its backup processor
1989     // get backup message buffer and sent to crashed processor
1990     static void askProcDataHandler(char *msg)
1991     {
1992 #if CMK_MEM_CHECKPOINT
1993       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1994       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1995       if (CpvAccess(procChkptBuf) == NULL)  {
1996         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1997         CkAbort("no checkpoint found");
1998       }
1999       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2000       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2001
2002       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2003
2004       CkPackMessage(&env);
2005       CmiSetHandler(env, recoverProcDataHandlerIdx);
2006       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2007       CpvAccess(procChkptBuf) = NULL;
2008       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2009 #endif
2010     }
2011
2012     // called on PE 0
2013     void qd_callback(void *m)
2014     {
2015       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2016       fflush(stdout);
2017       CkFreeMsg(m);
2018       if(CmiNumPartition()==1){
2019 #ifdef CMK_SMP
2020         for(int i=0;i<CmiMyNodeSize();i++){
2021           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2022           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2023           CmiSetHandler(msg, askProcDataHandlerIdx);
2024           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2025           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2026         }
2027         return;
2028 #endif
2029         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2030         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2031         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2032         CmiSetHandler(msg, askProcDataHandlerIdx);
2033         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2034         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2035       }
2036       else{
2037         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2038         CmiSetHandler(msg, recvPhaseHandlerIdx);
2039         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2040       }
2041     }
2042
2043     // on crashed node
2044     void CkMemRestart(const char *dummy, CkArgMsg *args)
2045     {
2046 #if CMK_MEM_CHECKPOINT
2047       _diePE = CmiMyNode();
2048       CpvAccess( _crashedNode )= CmiMyNode();
2049       CkMemCheckPT::inRestarting = 1;
2050       _discard_charm_message();
2051
2052       /*if(CmiMyRank()==0){
2053         CkCallback cb(qd_callback);
2054         CkStartQD(cb);
2055         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2056         }*/
2057       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2058       if(CmiNumPartition()==1){
2059         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2060         restartT = CmiWallTimer();
2061         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2062         fflush(stdout);
2063         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2064         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2065         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2066         CmiSetHandler(msg, askProcDataHandlerIdx);
2067         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2068         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2069       }
2070       else{
2071         CkCallback cb(qd_callback);
2072         CkStartQD(cb);
2073       }
2074 #else
2075       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2076 #endif
2077     }
2078
2079     // can be called in other files
2080     // return true if it is in restarting
2081     extern "C"
2082       int CkInRestarting()
2083       {
2084 #if CMK_MEM_CHECKPOINT
2085         if (CpvAccess( _crashedNode)!=-1) return 1;
2086         // gzheng
2087         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2088         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2089         return CkMemCheckPT::inRestarting;
2090 #else
2091         return 0;
2092 #endif
2093       }
2094
2095     extern "C"
2096       int CkReplicaAlive()
2097       {
2098         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2099         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2100         return CkMemCheckPT::replicaAlive;
2101
2102         /*if(CkMemCheckPT::replicaDead==1)
2103           return 0;
2104           else          
2105           return 1;*/
2106       }
2107
2108     extern "C"
2109       int CkInCheckpointing()
2110       {
2111         return CkMemCheckPT::inCheckpointing;
2112       }
2113
2114     extern "C"
2115       void CkSetInLdb(){
2116 #if CMK_MEM_CHECKPOINT
2117         CkMemCheckPT::inLoadbalancing = 1;
2118 #endif
2119       }
2120
2121     extern "C"
2122       int CkInLdb(){
2123 #if CMK_MEM_CHECKPOINT
2124         return CkMemCheckPT::inLoadbalancing;
2125 #endif
2126         return 0;
2127       }
2128
2129     extern "C"
2130       void CkResetInLdb(){
2131 #if CMK_MEM_CHECKPOINT
2132         CkMemCheckPT::inLoadbalancing = 0;
2133 #endif
2134       }
2135
2136     /*****************************************************************************
2137       module initialization
2138      *****************************************************************************/
2139
2140     static int arg_where = CkCheckPoint_inMEM;
2141
2142 #if CMK_MEM_CHECKPOINT
2143     void init_memcheckpt(char **argv)
2144     {
2145       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2146         arg_where = CkCheckPoint_inDISK;
2147       }
2148       CpvInitialize(int, use_checksum);
2149       CpvInitialize(int, resilience);
2150       CpvAccess(use_checksum)=0;
2151       CpvAccess(resilience)=0;
2152       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2153         CpvAccess(use_checksum)=1;
2154       }
2155       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2156         CpvAccess(resilience)=1;
2157       }
2158       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2159         CpvAccess(resilience)=2;
2160       }
2161       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2162         CpvAccess(resilience)=3;
2163       }
2164       // initiliazing _crashedNode variable
2165       CpvInitialize(int, _crashedNode);
2166       CpvInitialize(int, _remoteCrashedNode);
2167       CpvAccess(_crashedNode) = -1;
2168       CpvAccess(_remoteCrashedNode) = -1;
2169       init_FI(argv);
2170     }
2171 #endif
2172
2173     class CkMemCheckPTInit: public Chare {
2174       public:
2175         CkMemCheckPTInit(CkArgMsg *m) {
2176 #if CMK_MEM_CHECKPOINT
2177           if (arg_where == CkCheckPoint_inDISK) {
2178             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2179           }
2180           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2181           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2182 #endif
2183         }
2184     };
2185
2186     static void notifyHandler(char *msg)
2187     {
2188 #if CMK_MEM_CHECKPOINT
2189       CmiFree(msg);
2190       /* immediately increase restart phase to filter old messages */
2191       CpvAccess(_curRestartPhase) ++;
2192       CpvAccess(_qd)->flushStates();
2193       _discard_charm_message();
2194
2195 #endif
2196     }
2197
2198     extern "C"
2199       void notify_crash(int node)
2200       {
2201 #ifdef CMK_MEM_CHECKPOINT
2202         CpvAccess( _crashedNode) = node;
2203 #ifdef CMK_SMP
2204         for(int i=0;i<CkMyNodeSize();i++){
2205           CpvAccessOther(_crashedNode,i)=node;
2206         }
2207 #endif
2208         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2209         CkMemCheckPT::inRestarting = 1;
2210
2211         // this may be in interrupt handler, send a message to reset QD
2212         int pe = CmiNodeFirst(CkMyNode());
2213         for(int i=0;i<CkMyNodeSize();i++){
2214           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2215           CmiSetHandler(msg, notifyHandlerIdx);
2216           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2217         }
2218 #endif
2219       }
2220
2221     extern "C" void (*notify_crash_fn)(int node);
2222
2223
2224 #if CMK_CONVERSE_MPI
2225     void buddyDieHandler(char *msg)
2226     {
2227 #if CMK_MEM_CHECKPOINT
2228       // notify
2229       CkMemCheckPT::inRestarting = 1;
2230       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2231       notify_crash(diepe);
2232       // send message to crash pe to let it restart
2233       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2234       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2235       int buddy = obj->BuddyPE(CmiMyPe());
2236       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2237       if (buddy == diepe)  {
2238         mpi_restart_crashed(diepe, newrank);
2239       }
2240 #endif
2241     }
2242
2243     void pingHandler(void *msg)
2244     {
2245       lastPingTime = CmiWallTimer();
2246       CmiFree(msg);
2247     }
2248
2249     void pingCheckHandler()
2250     {
2251 #if CMK_MEM_CHECKPOINT
2252       double now = CmiWallTimer();
2253       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2254         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2255         int i, pe, buddy;
2256         // tell everyone the buddy dies
2257         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2258         for (i = 1; i < CmiNumPes(); i++) {
2259           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2260           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2261         }
2262         buddy = pe;
2263         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2264         fflush(stdout);
2265         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2266         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2267         CmiSetHandler(msg, buddyDieHandlerIdx);
2268         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2269         //send to everyone in the other world
2270         if(CmiNumPartition()!=1){
2271           //for(int i=0;i<CmiNumPes();i++){
2272             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2273             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2274             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2275             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2276           //}
2277         }
2278       }
2279         else 
2280           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2281 #endif
2282       }
2283
2284       void pingBuddy()
2285       {
2286 #if CMK_MEM_CHECKPOINT
2287         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2288         if (obj) {
2289           int buddy = obj->BuddyPE(CkMyPe());
2290           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2291           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2292           CmiSetHandler(msg, pingHandlerIdx);
2293           CmiGetRestartPhase(msg) = 9999;
2294           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2295         }
2296         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2297 #endif
2298       }
2299 #endif
2300
2301       // initproc
2302       void CkRegisterRestartHandler( )
2303       {
2304 #if CMK_MEM_CHECKPOINT
2305         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2306         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2307         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2308         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2309         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2310         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2311
2312         //for replica
2313         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2314         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2315         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2316         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2317         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2318         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2319         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2320         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2321         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2322         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2323
2324 #if CMK_CONVERSE_MPI
2325         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2326         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2327         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2328 #endif
2329
2330         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2331         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2332         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2333         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2334         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2335         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2336         CpvInitialize(int,curPointer);
2337         CpvInitialize(int,recvdLocal);
2338         CpvInitialize(int,localChkpDone);
2339         CpvInitialize(int,remoteChkpDone);
2340         CpvInitialize(int,remoteStarted);
2341         CpvInitialize(int,localStarted);
2342         CpvInitialize(int,localReady);
2343         CpvInitialize(int,remoteReady);
2344         CpvInitialize(int,recvdRemote);
2345         CpvInitialize(int,recvdProcChkp);
2346         CpvInitialize(int,localChecksum);
2347         CpvInitialize(int,remoteChecksum);
2348         CpvInitialize(int,recvdArrayChkp);
2349
2350         CpvAccess(procChkptBuf) = NULL;
2351         CpvAccess(buddyBuf) = NULL;
2352         CpvAccess(recoverProcBuf) = NULL;
2353         CpvAccess(recoverArrayBuf) = NULL;
2354         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2355         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2356         CpvAccess(chkpBuf)[0] = NULL;
2357         CpvAccess(chkpBuf)[1] = NULL;
2358         CpvAccess(localProcChkpBuf)[0] = NULL;
2359         CpvAccess(localProcChkpBuf)[1] = NULL;
2360
2361         CpvAccess(curPointer) = 0;
2362         CpvAccess(recvdLocal) = 0;
2363         CpvAccess(localChkpDone) = 0;
2364         CpvAccess(remoteChkpDone) = 0;
2365         CpvAccess(remoteStarted) = 0;
2366         CpvAccess(localStarted) = 0;
2367         CpvAccess(localReady) = 0;
2368         CpvAccess(remoteReady) = 0;
2369         CpvAccess(recvdRemote) = 0;
2370         CpvAccess(recvdProcChkp) = 0;
2371         CpvAccess(localChecksum) = 0;
2372         CpvAccess(remoteChecksum) = 0;
2373         CpvAccess(recvdArrayChkp) = 0;
2374
2375         notify_crash_fn = notify_crash;
2376
2377 #if ! CMK_CONVERSE_MPI
2378         // print pid to kill
2379         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2380         //  sleep(4);
2381 #endif
2382 #endif
2383       }
2384
2385
2386       extern "C"
2387         int CkHasCheckpoints()
2388         {
2389           return checkpointed;
2390         }
2391
2392       /// @todo: the following definitions should be moved to a separate file containing
2393       // structures and functions about fault tolerance strategies
2394
2395       /**
2396        *  * @brief: function for killing a process                                             
2397        *   */
2398 #ifdef CMK_MEM_CHECKPOINT
2399 #if CMK_HAS_GETPID
2400       void killLocal(void *_dummy,double curWallTime){
2401         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2402         if(CmiWallTimer()<killTime-1){
2403           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2404         }else{ 
2405 #if CMK_CONVERSE_MPI
2406           CkDieNow();
2407 #else 
2408           kill(getpid(),SIGKILL);                                               
2409 #endif
2410         }              
2411       } 
2412 #else
2413       void killLocal(void *_dummy,double curWallTime){
2414         CmiAbort("kill() not supported!");
2415       }
2416 #endif
2417 #endif
2418
2419 #ifdef CMK_MEM_CHECKPOINT
2420       /**
2421        * @brief: reads the file with the kill information
2422        */
2423       void readKillFile(){
2424         FILE *fp=fopen(killFile,"r");
2425         if(!fp){
2426           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2427           return;
2428         }
2429         int proc;
2430         double sec;
2431         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2432           if(proc == CkMyNode() && CkMyRank() == 0){
2433             killTime = CmiWallTimer()+sec;
2434             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2435             CcdCallFnAfter(killLocal,NULL,sec*1000);
2436           }
2437         }
2438         fclose(fp);
2439       }
2440
2441 #if ! CMK_CONVERSE_MPI
2442       void CkDieNow()
2443       {
2444 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2445         // ignored for non-mpi version
2446         CmiPrintf("[%d] die now.\n", CmiMyPe());
2447         killTime = CmiWallTimer()+0.001;
2448         CcdCallFnAfter(killLocal,NULL,1);
2449 #endif
2450       }
2451 #endif
2452
2453 #endif
2454
2455 #include "CkMemCheckpoint.def.h"
2456
2457