init forward path
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74
75 // static, so that it is accessible from Converse part
76 int CkMemCheckPT::inRestarting = 0;
77 int CkMemCheckPT::inLoadbalancing = 0;
78 double CkMemCheckPT::startTime;
79 char *CkMemCheckPT::stage;
80 CkCallback CkMemCheckPT::cpCallback;
81
82 int _memChkptOn = 1;                    // checkpoint is on or off
83
84 CkGroupID ckCheckPTGroupID;             // readonly
85
86 static int checkpointed = 0;
87
88 /// @todo the following declarations should be moved into a separate file for all 
89 // fault tolerant strategies
90
91 #ifdef CMK_MEM_CHECKPOINT
92 // name of the kill file that contains processes to be killed 
93 char *killFile;                                               
94 // flag for the kill file         
95 int killFlag=0;
96 // variable for storing the killing time
97 double killTime=0.0;
98 #endif
99
100 #ifdef CKLOCMGR_LOOP
101 #undef CKLOCMGR_LOOP
102 #endif
103 // loop over all CkLocMgr and do "code"
104 #define  CKLOCMGR_LOOP(code)    {       \
105   int numGroups = CkpvAccess(_groupIDTable)->size();    \
106   for(int i=0;i<numGroups;i++) {        \
107     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
108     if(obj->isLocMgr())  {      \
109       CkLocMgr *mgr = (CkLocMgr*)obj;   \
110       code      \
111     }   \
112   }     \
113  }
114
115 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
116 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
117 CpvDeclare(CkCheckPTMessage**, chkpBuf);
118 //store the checkpoint of the buddy to compare
119 //do not need the whole msg, can be the checksum
120 CpvDeclare(CkCheckPTMessage*, buddyBuf);
121 //pointer of the checkpoint going to be written
122 CpvDeclare(int, curPointer);
123 CpvDeclare(int, recvdRemote);
124 CpvDeclare(int, recvdLocal);
125
126 bool compare(char * buf1, char * buf2);
127 static inline void _handleProcData(PUP::er &p);
128 // Converse function handles
129 static int askPhaseHandlerIdx;
130 static int recvPhaseHandlerIdx;
131 static int askProcDataHandlerIdx;
132 static int restartBcastHandlerIdx;
133 static int recoverProcDataHandlerIdx;
134 static int restartBeginHandlerIdx;
135 static int recvRemoteChkpHandlerIdx;
136 static int notifyHandlerIdx;
137 // compute the backup processor
138 // FIXME: avoid crashed processors
139 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
140
141 inline int CkMemCheckPT::BuddyPE(int pe)
142 {
143   int budpe;
144 #if NODE_CHECKPOINT
145     // buddy is the processor with same rank on the next physical node
146   int r1 = CmiPhysicalRank(pe);
147   int budnode = CmiPhysicalNodeID(pe);
148   do {
149     budnode = (budnode+1)%CmiNumPhysicalNodes();
150     int *pelist;
151     int num;
152     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
153     budpe = pelist[r1 % num];
154   } while (isFailed(budpe));
155   if (budpe == pe) {
156     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
157     CmiAbort("Failed to find a buddy processor");
158   }
159 #else
160   budpe = pe;
161   while (budpe == pe || isFailed(budpe)) 
162           budpe = (budpe+1)%CkNumPes();
163 #endif
164   return budpe;
165 }
166
167 // called in array element constructor
168 // choose and register with 2 buddies for checkpoiting 
169 #if CMK_MEM_CHECKPOINT
170 void ArrayElement::init_checkpt() {
171         if (_memChkptOn == 0) return;
172         if (CkInRestarting()) {
173           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
174         }
175         // only master init checkpoint
176         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
177
178         budPEs[0] = CkMyPe();
179         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
180         CmiAssert(budPEs[0] != budPEs[1]);
181         // inform checkPTMgr
182         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
183         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
184         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
185         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
186 }
187 #endif
188
189 // entry function invoked by checkpoint mgr asking for checkpoint data
190 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
191 #if CMK_MEM_CHECKPOINT
192 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
193 //char index[128];   thisIndexMax.sprint(index);
194 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
195   CkLocMgr *locMgr = thisArray->getLocMgr();
196   CmiAssert(myRec!=NULL);
197   int size;
198   {
199         PUP::sizer p;
200         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
201         size = p.size();
202   }
203   int packSize = size/sizeof(double) +1;
204   CkArrayCheckPTMessage *msg =
205                  new (packSize, 0) CkArrayCheckPTMessage;
206   msg->len = size;
207   msg->index =thisIndexMax;
208   msg->aid = thisArrayID;
209   msg->locMgr = locMgr->getGroupID();
210   msg->cp_flag = 1;
211   {
212         PUP::toMem p(msg->packData);
213         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
214   }
215
216   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
217   checkptMgr.recvData(msg, 2, budPEs);
218   delete m;
219 #endif
220 }
221
222 // checkpoint holder class - for memory checkpointing
223 class CkMemCheckPTInfo: public CkCheckPTInfo
224 {
225   CkArrayCheckPTMessage *ckBuffer;
226 public:
227   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
228                     CkCheckPTInfo(a, loc, idx, pno)
229   {
230     ckBuffer = NULL;
231   }
232   ~CkMemCheckPTInfo() 
233   {
234     if (ckBuffer) delete ckBuffer; 
235   }
236   inline void updateBuffer(CkArrayCheckPTMessage *data) 
237   {
238     CmiAssert(data!=NULL);
239     if (ckBuffer) delete ckBuffer;
240     ckBuffer = data;
241   }    
242   inline CkArrayCheckPTMessage * getCopy()
243   {
244     if (ckBuffer == NULL) {
245       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
246       CmiAbort("Abort!");
247     }
248     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
249   }     
250   inline void updateBuddy(int b1, int b2) {
251      CmiAssert(ckBuffer);
252      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
253      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
254      CmiAssert(pNo != CkMyPe());
255   }
256   inline int getSize() { 
257      CmiAssert(ckBuffer);
258      return ckBuffer->len; 
259   }
260 };
261
262 // checkpoint holder class - for in-disk checkpointing
263 class CkDiskCheckPTInfo: public CkCheckPTInfo 
264 {
265   char *fname;
266   int bud1, bud2;
267   int len;                      // checkpoint size
268 public:
269   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
270   {
271 #if CMK_USE_MKSTEMP
272     fname = new char[64];
273     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
274     mkstemp(fname);
275 #else
276     fname=tmpnam(NULL);
277 #endif
278     bud1 = bud2 = -1;
279     len = 0;
280   }
281   ~CkDiskCheckPTInfo() 
282   {
283     remove(fname);
284   }
285   inline void updateBuffer(CkArrayCheckPTMessage *data) 
286   {
287     double t = CmiWallTimer();
288     // unpack it
289     envelope *env = UsrToEnv(data);
290     CkUnpackMessage(&env);
291     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
292     FILE *f = fopen(fname,"wb");
293     PUP::toDisk p(f);
294     CkPupMessage(p, (void **)&data);
295     // delay sync to the end because otherwise the messages are blocked
296 //    fsync(fileno(f));
297     fclose(f);
298     bud1 = data->bud1;
299     bud2 = data->bud2;
300     len = data->len;
301     delete data;
302     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
303   }
304   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
305   {
306     CkArrayCheckPTMessage *data;
307     FILE *f = fopen(fname,"rb");
308     PUP::fromDisk p(f);
309     CkPupMessage(p, (void **)&data);
310     fclose(f);
311     data->bud1 = bud1;                          // update the buddies
312     data->bud2 = bud2;
313     return data;
314   }
315   inline void updateBuddy(int b1, int b2) {
316      bud1 = b1; bud2 = b2;
317      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
318      CmiAssert(pNo != CkMyPe());
319   }
320   inline int getSize() { 
321      return len; 
322   }
323 };
324
325 CkMemCheckPT::CkMemCheckPT(int w)
326 {
327   int numnodes = 0;
328 #if NODE_CHECKPOINT
329   numnodes = CmiNumPhysicalNodes();
330 #else
331   numnodes = CkNumPes();
332 #endif
333 #if CK_NO_PROC_POOL
334   if (numnodes <= 2)
335 #else
336   if (numnodes  == 1)
337 #endif
338   {
339     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
340     _memChkptOn = 0;
341   }
342   inRestarting = 0;
343   recvCount = peCount = 0;
344   ackCount = 0;
345   expectCount = -1;
346   where = w;
347
348 #if CMK_CONVERSE_MPI
349   void pingBuddy();
350   void pingCheckHandler();
351   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
352   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
353 #endif
354   chkpTable[0] = NULL;
355   chkpTable[1] = NULL;
356 }
357
358 CkMemCheckPT::~CkMemCheckPT()
359 {
360   int len = ckTable.length();
361   for (int i=0; i<len; i++) {
362     delete ckTable[i];
363   }
364 }
365
366 void CkMemCheckPT::pup(PUP::er& p) 
367
368   CBase_CkMemCheckPT::pup(p); 
369   p|cpStarter;
370   p|thisFailedPe;
371   p|failedPes;
372   p|ckCheckPTGroupID;           // recover global variable
373   p|cpCallback;                 // store callback
374   p|where;                      // where to checkpoint
375   p|peCount;
376   if (p.isUnpacking()) {
377     recvCount = 0;
378 #if CMK_CONVERSE_MPI
379     void pingBuddy();
380     void pingCheckHandler();
381     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
382     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
383 #endif
384   }
385 }
386
387 // called by checkpoint mgr to restore an array element
388 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
389 {
390 #if CMK_MEM_CHECKPOINT
391   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
392   // m->index.print();
393   PUP::fromMem p(m->packData);
394   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
395   CmiAssert(mgr);
396 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
397   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
398 #else
399   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
400 #endif
401
402   // find a list of array elements bound together
403   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
404   CmiAssert(elt);
405   CkLocRec_local *rec = elt->myRec;
406   CkVec<CkMigratable *> list;
407   mgr->migratableList(rec, list);
408   CmiAssert(list.length() > 0);
409   for (int l=0; l<list.length(); l++) {
410     elt = (ArrayElement *)list[l];
411     elt->budPEs[0] = m->bud1;
412     elt->budPEs[1] = m->bud2;
413     //    reset, may not needed now
414     // for now.
415     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
416       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
417       if (c) c->redNo = 0;
418     }
419   }
420 #endif
421 }
422
423 // return 1 if pe is a crashed processor
424 int CkMemCheckPT::isFailed(int pe)
425 {
426   for (int i=0; i<failedPes.length(); i++)
427     if (failedPes[i] == pe) return 1;
428   return 0;
429 }
430
431 // add pe into history list of all failed processors
432 void CkMemCheckPT::failed(int pe)
433 {
434   if (isFailed(pe)) return;
435   failedPes.push_back(pe);
436 }
437
438 int CkMemCheckPT::totalFailed()
439 {
440   return failedPes.length();
441 }
442
443 // create an checkpoint entry for array element of aid with index.
444 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
445 {
446   // error check, no duplicate
447   int idx, len = ckTable.size();
448   for (idx=0; idx<len; idx++) {
449     CkCheckPTInfo *entry = ckTable[idx];
450     if (index == entry->index) {
451       if (loc == entry->locMgr) {
452           // bindTo array elements
453           return;
454       }
455         // for array inheritance, the following check may fail
456         // because ArrayElement constructor of all superclasses are called
457       if (aid == entry->aid) {
458         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
459         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
460       }
461     }
462   }
463   CkCheckPTInfo *newEntry;
464   if (where == CkCheckPoint_inMEM)
465     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
466   else
467     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
468   ckTable.push_back(newEntry);
469   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
470 }
471
472 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
473 {
474 #if !CMK_CHKP_ALL       
475   int buddy = msg->bud1;
476   if (buddy == CkMyPe()) buddy = msg->bud2;
477   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
478   recvData(msg);
479     // ack
480   thisProxy[buddy].gotData();
481 #else
482   chkpTable[0] = NULL;
483   chkpTable[1] = NULL;
484   recvArrayCheckpoint(msg);
485   thisProxy[msg->bud2].gotData();
486 #endif
487 }
488
489 // loop through my checkpoint table and ask checkpointed array elements
490 // to send me checkpoint data.
491 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
492 {
493   checkpointed = 1;
494   cpCallback = cb;
495   cpStarter = starter;
496   if (CkMyPe() == cpStarter) {
497     startTime = CmiWallTimer();
498     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
499   }
500 #if !CMK_CHKP_ALL
501   int len = ckTable.length();
502   for (int i=0; i<len; i++) {
503     CkCheckPTInfo *entry = ckTable[i];
504       // always let the bigger number processor send request
505     //if (CkMyPe() < entry->pNo) continue;
506       // always let the smaller number processor send request, may on same proc
507     if (!isMaster(entry->pNo)) continue;
508       // call inmem_checkpoint to the array element, ask it to send
509       // back checkpoint data via recvData().
510     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
511     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
512   }
513     // if my table is empty, then I am done
514   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
515 #else
516   startCheckpoint();
517   //startArrayCheckpoint();
518 #endif
519   // pack and send proc level data
520   //sendProcData();
521 }
522
523 class MemElementPacker : public CkLocIterator{
524         private:
525                 CkLocMgr *locMgr;
526                 PUP::er &p;
527         public:
528                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
529                 void addLocation(CkLocation &loc){
530                         CkArrayIndexMax idx = loc.getIndex();
531                         CkGroupID gID = locMgr->ckGetGroupID();
532                         ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
533                         CmiAssert(elt);
534                         //elt = (ArrayElement *)locMgr->lookup(idx, aid);
535                         p|gID;
536                         p|idx;
537                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
538                 }
539 };
540
541 void CkMemCheckPT::pupAllElements(PUP::er &p){
542 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
543         int numElements;
544         if(!p.isUnpacking()){
545                 numElements = CkCountArrayElements();
546         }
547         p | numElements;
548         if(!p.isUnpacking()){
549                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
550         }
551 #endif
552 }
553
554 void CkMemCheckPT::startArrayCheckpoint(){
555 #if CMK_CHKP_ALL
556         int size;
557         {
558                 PUP::sizer psizer;
559                 pupAllElements(psizer);
560                 size = psizer.size();
561         }
562         int packSize = size/sizeof(double)+1;
563  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
564         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
565         msg->len = size;
566         msg->cp_flag = 1;
567         int budPEs[2];
568         msg->bud1=CkMyPe();
569         msg->bud2=ChkptOnPe(CkMyPe());
570         {
571                 PUP::toMem p(msg->packData);
572                 pupAllElements(p);
573         }
574         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
575         if(chkpTable[0]) delete chkpTable[0];
576         chkpTable[0] = msg;
577         //send the checkpoint to my 
578         recvCount++;
579 #endif
580 }
581
582 void CkMemCheckPT::startCheckpoint(){
583 #if CMK_CHKP_ALL
584         int size;
585         {
586                 PUP::sizer psizer;
587                 pupAllElements(psizer);
588                 _handleProcData(psizer);
589                 size = psizer.size();
590         }
591         int packSize = size/sizeof(double)+1;
592  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
593         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
594         msg->len = size;
595         msg->cp_flag = 1;
596         int budPEs[2];
597         //msg->bud1=CkMyPe();
598         //msg->bud2=ChkptOnPe(CkMyPe());
599         {
600                 PUP::toMem p(msg->packData);
601                 pupAllElements(p);
602                 _handleProcData(p);
603         }
604         int pointer = CpvAccess(curPointer);
605         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
606                 CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
607         CpvAccess(recvdLocal) = 1;
608
609         envelope * env = (envelope *)(UsrToEnv(msg));
610         CkPackMessage(&env);
611         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
612         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
613         
614         if(CpvAccess(recvdRemote)==1){
615                 //compare the checkpoint 
616           int size = CpvAccess(chkpBuf)[pointer]->len;
617           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
618                 thisProxy[CkMyPe()].doneComparison(true);
619           }else{
620                 thisProxy[CkMyPe()].doneComparison(true);
621           }
622         }
623 #endif
624 }
625
626 void CkMemCheckPT::doneComparison(bool ret){
627         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
628         contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
629 }
630
631 void CkMemCheckPT::doneRComparison(bool ret){
632         if(ret==true){
633         CpvAccess(recvdRemote) = 0;
634         CpvAccess(recvdLocal) = 0;
635         CpvAccess(curPointer)^=1;
636                 cpCallback.send();
637         }else{
638         }
639
640 }
641
642 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
643 {
644 #if CMK_CHKP_ALL
645         int idx = 1;
646         if(msg->bud1 == CkMyPe()){
647                 idx = 0;
648         }
649         int isChkpting = msg->cp_flag;
650         if(isChkpting == 1){
651                 if(chkpTable[idx]) delete chkpTable[idx];
652         }
653         chkpTable[idx] = msg;
654         if(isChkpting){
655                 recvCount++;
656                 if(recvCount == 2){
657                   if (where == CkCheckPoint_inMEM) {
658                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
659                   }
660                   else if (where == CkCheckPoint_inDISK) {
661                         // another barrier for finalize the writing using fsync
662                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
663                         contribute(0,NULL,CkReduction::sum_int,localcb);
664                   }
665                   else
666                         CmiAbort("Unknown checkpoint scheme");
667                   recvCount = 0;
668                 }
669         }
670 #endif
671 }
672
673 // don't handle array elements
674 static inline void _handleProcData(PUP::er &p)
675 {
676     // save readonlys, and callback BTW
677     CkPupROData(p);
678
679     // save mainchares 
680     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
681         
682 #ifndef CMK_CHARE_USE_PTR
683     // save non-migratable chare
684     CkPupChareData(p);
685 #endif
686
687     // save groups into Groups.dat
688     CkPupGroupData(p);
689
690     // save nodegroups into NodeGroups.dat
691     if(CkMyRank()==0) CkPupNodeGroupData(p);
692 }
693
694 void CkMemCheckPT::sendProcData()
695 {
696   // find out size of buffer
697   int size;
698   {
699     PUP::sizer p;
700     _handleProcData(p);
701     size = p.size();
702   }
703   int packSize = size;
704   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
705   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
706   {
707     PUP::toMem p(msg->packData);
708     _handleProcData(p);
709   }
710   msg->pe = CkMyPe();
711   msg->len = size;
712   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
713   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
714 }
715
716 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
717 {
718   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
719   CpvAccess(procChkptBuf) = msg;
720   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
721   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
722 }
723
724 // ArrayElement call this function to give us the checkpointed data
725 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
726 {
727   int len = ckTable.length();
728   int idx;
729   for (idx=0; idx<len; idx++) {
730     CkCheckPTInfo *entry = ckTable[idx];
731     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
732   }
733   CkAssert(idx < len);
734   int isChkpting = msg->cp_flag;
735   ckTable[idx]->updateBuffer(msg);
736   if (isChkpting) {
737       // all my array elements have returned their inmem data
738       // inform starter processor that I am done.
739     recvCount ++;
740     if (recvCount == ckTable.length()) {
741       if (where == CkCheckPoint_inMEM) {
742         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
743       }
744       else if (where == CkCheckPoint_inDISK) {
745         // another barrier for finalize the writing using fsync
746         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
747         contribute(0,NULL,CkReduction::sum_int,localcb);
748       }
749       else
750         CmiAbort("Unknown checkpoint scheme");
751       recvCount = 0;
752     } 
753   }
754 }
755
756 // only used in disk checkpointing
757 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
758 {
759   delete m;
760 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
761   system("sync");
762 #endif
763   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
764 }
765
766 // only is called on cpStarter when checkpoint is done
767 void CkMemCheckPT::cpFinish()
768 {
769   CmiAssert(CkMyPe() == cpStarter);
770   peCount++;
771     // now that all processors have finished, activate callback
772   if (peCount == 2) 
773 {
774     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
775     cpCallback.send();
776     peCount = 0;
777     thisProxy.report();
778   }
779 }
780
781 // for debugging, report checkpoint info
782 void CkMemCheckPT::report()
783 {
784 #if !CMK_CHKP_ALL
785   int objsize = 0;
786   int len = ckTable.length();
787   for (int i=0; i<len; i++) {
788     CkCheckPTInfo *entry = ckTable[i];
789     CmiAssert(entry);
790     objsize += entry->getSize();
791   }
792   CmiAssert(CpvAccess(procChkptBuf));
793   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
794 #else
795   if(CkMyPe()==0)
796   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
797 #endif
798 }
799
800 /*****************************************************************************
801                         RESTART Procedure
802 *****************************************************************************/
803
804 // master processor of two buddies
805 inline int CkMemCheckPT::isMaster(int buddype)
806 {
807 #if 0
808   int mype = CkMyPe();
809 //CkPrintf("ismaster: %d %d\n", pe, mype);
810   if (CkNumPes() - totalFailed() == 2) {
811     return mype > buddype;
812   }
813   for (int i=1; i<CkNumPes(); i++) {
814     int me = (buddype+i)%CkNumPes();
815     if (isFailed(me)) continue;
816     if (me == mype) return 1;
817     else return 0;
818   }
819   return 0;
820 #else
821     // smaller one
822   int mype = CkMyPe();
823 //CkPrintf("ismaster: %d %d\n", pe, mype);
824   if (CkNumPes() - totalFailed() == 2) {
825     return mype < buddype;
826   }
827 #if NODE_CHECKPOINT
828   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
829   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
830 #else
831   for (int i=1; i<CkNumPes(); i++) {
832 #endif
833     int me = (mype+i)%CkNumPes();
834     if (isFailed(me)) continue;
835     if (me == buddype) return 1;
836     else return 0;
837   }
838   return 0;
839 #endif
840 }
841
842
843
844 #if 0
845 // helper class to pup all elements that belong to same ckLocMgr
846 class ElementDestoryer : public CkLocIterator {
847 private:
848         CkLocMgr *locMgr;
849 public:
850         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
851         void addLocation(CkLocation &loc) {
852                 CkArrayIndex idx=loc.getIndex();
853                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
854                 loc.destroy();
855         }
856 };
857 #endif
858
859 // restore the bitmap vector for LB
860 void CkMemCheckPT::resetLB(int diepe)
861 {
862 #if CMK_LBDB_ON
863   int i;
864   char *bitmap = new char[CkNumPes()];
865   // set processor available bitmap
866   get_avail_vector(bitmap);
867
868   for (i=0; i<failedPes.length(); i++)
869     bitmap[failedPes[i]] = 0; 
870   bitmap[diepe] = 0;
871
872 #if CK_NO_PROC_POOL
873   set_avail_vector(bitmap);
874 #endif
875
876   // if I am the crashed pe, rebuild my failedPEs array
877   if (CkMyNode() == diepe)
878     for (i=0; i<CkNumPes(); i++) 
879       if (bitmap[i]==0) failed(i);
880
881   delete [] bitmap;
882 #endif
883 }
884
885 // in case when failedPe dies, everybody go through its checkpoint table:
886 // destory all array elements
887 // recover lost buddies
888 // reconstruct all array elements from check point data
889 // called on all processors
890 void CkMemCheckPT::restart(int diePe)
891 {
892 #if CMK_MEM_CHECKPOINT
893   double curTime = CmiWallTimer();
894   if (CkMyPe() == diePe)
895     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
896   stage = (char*)"resetLB";
897   startTime = curTime;
898   if (CkMyPe() == diePe)
899     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
900
901 #if CK_NO_PROC_POOL
902   failed(diePe);        // add into the list of failed pes
903 #endif
904   thisFailedPe = diePe;
905
906   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
907
908   inRestarting = 1;
909                                                                                 
910   // disable load balancer's barrier
911   if (CkMyPe() != diePe) resetLB(diePe);
912
913   CKLOCMGR_LOOP(mgr->startInserting(););
914
915   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
916   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
917 /*
918   if (CkMyPe() == 0)
919     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
920 */
921 #endif
922 }
923
924 // loally remove all array elements
925 void CkMemCheckPT::removeArrayElements()
926 {
927 #if CMK_MEM_CHECKPOINT
928   int len = ckTable.length();
929   double curTime = CmiWallTimer();
930   if (CkMyPe() == thisFailedPe) 
931     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
932   stage = (char*)"removeArrayElements";
933   startTime = curTime;
934
935   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
936   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
937
938   // get rid of all buffering and remote recs
939   // including destorying all array elements
940 #if CK_NO_PROC_POOL  
941         CKLOCMGR_LOOP(mgr->flushAllRecs(););
942 #else
943         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
944 #endif
945 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
946
947   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
948   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
949 #endif
950 }
951
952 // flush state in reduction manager
953 void CkMemCheckPT::resetReductionMgr()
954 {
955   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
956   int numGroups = CkpvAccess(_groupIDTable)->size();
957   for(int i=0;i<numGroups;i++) {
958     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
959     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
960     obj->flushStates();
961     obj->ckJustMigrated();
962   }
963   // reset again
964   //CpvAccess(_qd)->flushStates();
965
966 #if 1
967   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
968   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
969 #else
970   if (CkMyPe() == 0)
971     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
972 #endif
973 }
974
975 // recover the lost buddies
976 void CkMemCheckPT::recoverBuddies()
977 {
978   int idx;
979   int len = ckTable.length();
980   // ready to flush reduction manager
981   // cannot be CkMemCheckPT::restart because destory will modify states
982   double curTime = CmiWallTimer();
983   if (CkMyPe() == thisFailedPe)
984   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
985   stage = (char *)"recoverBuddies";
986   if (CkMyPe() == thisFailedPe)
987   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
988   startTime = curTime;
989
990   // recover buddies
991   expectCount = 0;
992 #if !CMK_CHKP_ALL
993   for (idx=0; idx<len; idx++) {
994     CkCheckPTInfo *entry = ckTable[idx];
995     if (entry->pNo == thisFailedPe) {
996 #if CK_NO_PROC_POOL
997       // find a new buddy
998       int budPe = BuddyPE(CkMyPe());
999 #else
1000       int budPe = thisFailedPe;
1001 #endif
1002       CkArrayCheckPTMessage *msg = entry->getCopy();
1003       msg->bud1 = budPe;
1004       msg->bud2 = CkMyPe();
1005       msg->cp_flag = 0;            // not checkpointing
1006       thisProxy[budPe].recoverEntry(msg);
1007       expectCount ++;
1008     }
1009   }
1010 #else
1011   //send to failed pe
1012   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1013 #if CK_NO_PROC_POOL
1014       // find a new buddy
1015       int budPe = BuddyPE(CkMyPe());
1016 #else
1017       int budPe = thisFailedPe;
1018 #endif
1019       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1020       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1021           msg->cp_flag = 0;            // not checkpointing
1022       msg->bud1 = budPe;
1023       msg->bud2 = CkMyPe();
1024       thisProxy[budPe].recoverEntry(msg);
1025       expectCount ++;
1026   }
1027 #endif
1028
1029 #if 1
1030   if (expectCount == 0) {
1031     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1032     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1033   }
1034 #else
1035   if (CkMyPe() == 0) {
1036     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1037   }
1038 #endif
1039
1040   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1041 }
1042
1043 void CkMemCheckPT::gotData()
1044 {
1045   ackCount ++;
1046   if (ackCount == expectCount) {
1047     ackCount = 0;
1048     expectCount = -1;
1049     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1050     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1051   }
1052 }
1053
1054 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1055 {
1056
1057   for (int i=0; i<n; i++) {
1058     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1059     mgr->updateLocation(idx[i], nowOnPe);
1060   }
1061         thisProxy[nowOnPe].gotReply();
1062 }
1063
1064 // restore array elements
1065 void CkMemCheckPT::recoverArrayElements()
1066 {
1067   double curTime = CmiWallTimer();
1068   int len = ckTable.length();
1069   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1070   stage = (char *)"recoverArrayElements";
1071   if (CkMyPe() == thisFailedPe)
1072   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1073   startTime = curTime;
1074  int flag = 0;
1075   // recover all array elements
1076   int count = 0;
1077
1078 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1079   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1080   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1081 #endif
1082
1083 #if !CMK_CHKP_ALL
1084   for (int idx=0; idx<len; idx++)
1085   {
1086     CkCheckPTInfo *entry = ckTable[idx];
1087 #if CK_NO_PROC_POOL
1088     // the bigger one will do 
1089 //    if (CkMyPe() < entry->pNo) continue;
1090     if (!isMaster(entry->pNo)) continue;
1091 #else
1092     // smaller one do it, which has the original object
1093     if (CkMyPe() == entry->pNo+1 || 
1094         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1095 #endif
1096 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1097
1098     entry->updateBuddy(CkMyPe(), entry->pNo);
1099     CkArrayCheckPTMessage *msg = entry->getCopy();
1100     // gzheng
1101     //thisProxy[CkMyPe()].inmem_restore(msg);
1102     inmem_restore(msg);
1103 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1104     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1105     int homePe = mgr->homePe(msg->index);
1106     if (homePe != CkMyPe()) {
1107       gmap[homePe].push_back(msg->locMgr);
1108       imap[homePe].push_back(msg->index);
1109     }
1110 #endif
1111     CkFreeMsg(msg);
1112     count ++;
1113   }
1114 #else
1115         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1116 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1117         recoverAll(msg,gmap,imap);
1118 #else
1119         recoverAll(msg);
1120 #endif
1121     CkFreeMsg(msg);
1122 #endif
1123   curTime = CmiWallTimer();
1124   if (CkMyPe() == thisFailedPe)
1125         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1126 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1127   for (int i=0; i<CkNumPes(); i++) {
1128     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1129       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1130         flag++; 
1131           }
1132   }
1133   delete [] imap;
1134   delete [] gmap;
1135 #endif
1136   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1137
1138   CKLOCMGR_LOOP(mgr->doneInserting(););
1139
1140   // _crashedNode = -1;
1141   CpvAccess(_crashedNode) = -1;
1142   inRestarting = 0;
1143 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1144   if (CkMyPe() == 0)
1145     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1146 #else
1147 if(flag == 0)
1148 {
1149     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1150 }
1151 #endif
1152 }
1153
1154 void CkMemCheckPT::gotReply(){
1155     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1156 }
1157
1158 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1159 #if CMK_CHKP_ALL
1160         PUP::fromMem p(msg->packData);
1161         int numElements;
1162         p|numElements;
1163         if(p.isUnpacking()){
1164                 for(int i=0;i<numElements;i++){
1165                         CkGroupID gID;
1166                         CkArrayIndex idx;
1167                         p|gID;
1168                         p|idx;
1169                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1170                         int homePe = mgr->homePe(idx);
1171 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1172                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1173 #else
1174                         mgr->resume(idx,p,CmiFalse,CmiTrue);
1175 #endif
1176 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1177                         homePe = mgr->homePe(idx);
1178                         if (homePe != CkMyPe()) {
1179                           gmap[homePe].push_back(gID);
1180                           imap[homePe].push_back(idx);
1181                         }
1182 #endif
1183                 }
1184         }
1185         if(CkMyPe()==thisFailedPe)
1186         CkPrintf("recover all ends\n"); 
1187 #endif
1188 }
1189
1190 static double restartT;
1191
1192 // on every processor
1193 // turn load balancer back on
1194 void CkMemCheckPT::finishUp()
1195 {
1196   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1197   //CKLOCMGR_LOOP(mgr->doneInserting(););
1198   
1199   if (CkMyPe() == thisFailedPe)
1200   {
1201        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1202        //CkStartQD(cpCallback);
1203        cpCallback.send();
1204        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1205   }
1206
1207 #if CK_NO_PROC_POOL
1208 #if NODE_CHECKPOINT
1209   int numnodes = CmiNumPhysicalNodes();
1210 #else
1211   int numnodes = CkNumPes();
1212 #endif
1213   if (numnodes-totalFailed() <=2) {
1214     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1215     _memChkptOn = 0;
1216   }
1217 #endif
1218 }
1219
1220 // called only on 0
1221 void CkMemCheckPT::quiescence(CkCallback &cb)
1222 {
1223   static int pe_count = 0;
1224   pe_count ++;
1225   CmiAssert(CkMyPe() == 0);
1226   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1227   if (pe_count == CkNumPes()) {
1228     pe_count = 0;
1229     cb.send();
1230   }
1231 }
1232
1233 // User callable function - to start a checkpoint
1234 // callback cb is used to pass control back
1235 void CkStartMemCheckpoint(CkCallback &cb)
1236 {
1237 #if CMK_MEM_CHECKPOINT
1238   if (_memChkptOn == 0) {
1239     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1240     cb.send();
1241     return;
1242   }
1243   if (CkInRestarting()) {
1244       // trying to checkpointing during restart
1245     cb.send();
1246     return;
1247   }
1248     // store user callback and user data
1249   CkMemCheckPT::cpCallback = cb;
1250
1251     // broadcast to start check pointing
1252   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1253   checkptMgr.doItNow(CkMyPe(), cb);
1254 #else
1255   // when mem checkpoint is disabled, invike cb immediately
1256   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1257   cb.send();
1258 #endif
1259 }
1260
1261 void CkRestartCheckPoint(int diePe)
1262 {
1263 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1264   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1265   // broadcast
1266   checkptMgr.restart(diePe);
1267 }
1268
1269 static int _diePE = -1;
1270
1271 // callback function used locally by ccs handler
1272 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1273 {
1274 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1275   CkRestartCheckPoint(_diePE);
1276 }
1277
1278
1279 // called on crashed PE
1280 static void restartBeginHandler(char *msg)
1281 {
1282   CmiFree(msg);
1283 #if CMK_MEM_CHECKPOINT
1284 #if CMK_USE_BARRIER
1285         if(CkMyPe()!=_diePE){
1286                 printf("restar begin on %d\n",CkMyPe());
1287                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1288                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1289                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1290         }else{
1291         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1292         CkRestartCheckPointCallback(NULL, NULL);
1293         }
1294 #else
1295   static int count = 0;
1296   CmiAssert(CkMyPe() == _diePE);
1297   count ++;
1298   if (count == CkNumPes()) {
1299     CkRestartCheckPointCallback(NULL, NULL);
1300     count = 0;
1301   }
1302 #endif
1303 #endif
1304 }
1305
1306 extern void _discard_charm_message();
1307 extern void _resume_charm_message();
1308
1309 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1310         return data;
1311 }
1312
1313 static void restartBcastHandler(char *msg)
1314 {
1315 #if CMK_MEM_CHECKPOINT
1316   // advance phase counter
1317   CkMemCheckPT::inRestarting = 1;
1318   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1319   // gzheng
1320   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1321
1322   if (CkMyPe()==_diePE)
1323     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1324
1325   // reset QD counters
1326 /*  gzheng
1327   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1328 */
1329
1330 /*  gzheng
1331   if (CkMyPe()==_diePE)
1332       CkRestartCheckPointCallback(NULL, NULL);
1333 */
1334   CmiFree(msg);
1335
1336   _resume_charm_message();
1337
1338     // reduction
1339   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1340   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1341 #if CMK_USE_BARRIER
1342         //CmiPrintf("before reduce\n"); 
1343         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1344         //CmiPrintf("after reduce\n");  
1345 #else
1346   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1347 #endif 
1348  checkpointed = 0;
1349 #endif
1350 }
1351
1352 extern void _initDone();
1353
1354 bool compare(char * buf1, char *buf2){
1355         return true;
1356 }
1357
1358 static void recvRemoteChkpHandler(char *msg){
1359    envelope *env = (envelope *)msg;
1360    CkUnpackMessage(&env);
1361    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1362   if(CpvAccess(recvdLocal)==1){
1363           int pointer = CpvAccess(curPointer);
1364           int size = CpvAccess(chkpBuf)[pointer]->len;
1365           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1366           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1367                 
1368                 checkptMgr[CkMyPe()].doneComparison(true);
1369                 }else
1370                 {
1371                         checkptMgr[CkMyPe()].doneComparison(false);
1372                 }
1373   }else{
1374           CpvAccess(recvdRemote) = 1;
1375           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1376           CpvAccess(buddyBuf) = chkpMsg;
1377   }  
1378 }
1379
1380 // called on crashed processor
1381 static void recoverProcDataHandler(char *msg)
1382 {
1383 #if CMK_MEM_CHECKPOINT
1384    int i;
1385    envelope *env = (envelope *)msg;
1386    CkUnpackMessage(&env);
1387    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1388    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1389    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1390    //cur_restart_phase ++;
1391      // gzheng ?
1392    //CpvAccess(_qd)->flushStates();
1393
1394    // restore readonly, mainchare, group, nodegroup
1395 //   int temp = cur_restart_phase;
1396 //   cur_restart_phase = -1;
1397    PUP::fromMem p(procMsg->packData);
1398    _handleProcData(p);
1399
1400    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1401    // gzheng
1402    CKLOCMGR_LOOP(mgr->startInserting(););
1403
1404    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1405    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1406    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1407    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1408
1409    _initDone();
1410 //   CpvAccess(_qd)->flushStates();
1411    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1412 #endif
1413 }
1414
1415 // called on its backup processor
1416 // get backup message buffer and sent to crashed processor
1417 static void askProcDataHandler(char *msg)
1418 {
1419 #if CMK_MEM_CHECKPOINT
1420     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1421     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1422     if (CpvAccess(procChkptBuf) == NULL)  {
1423       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1424       CkAbort("no checkpoint found");
1425     }
1426     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1427     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1428
1429     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1430
1431     CkPackMessage(&env);
1432     CmiSetHandler(env, recoverProcDataHandlerIdx);
1433     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1434     CpvAccess(procChkptBuf) = NULL;
1435     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1436 #endif
1437 }
1438
1439 // called on PE 0
1440 void qd_callback(void *m)
1441 {
1442    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1443    CkFreeMsg(m);
1444 #ifdef CMK_SMP
1445    for(int i=0;i<CmiMyNodeSize();i++){
1446    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1447    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1448         CmiSetHandler(msg, askProcDataHandlerIdx);
1449         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1450         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1451    }
1452    return;
1453 #endif
1454    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1455    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1456    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1457    CmiSetHandler(msg, askProcDataHandlerIdx);
1458    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1459    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1460
1461 }
1462
1463 // on crashed node
1464 void CkMemRestart(const char *dummy, CkArgMsg *args)
1465 {
1466 #if CMK_MEM_CHECKPOINT
1467    _diePE = CmiMyNode();
1468    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1469    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1470    CkMemCheckPT::inRestarting = 1;
1471
1472   CpvAccess( _crashedNode )= CmiMyNode();
1473         
1474   _discard_charm_message();
1475     restartT = CmiWallTimer();
1476    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1477   
1478   /*if(CmiMyRank()==0){
1479     CkCallback cb(qd_callback);
1480     CkStartQD(cb);
1481     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1482   }*/
1483    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1484    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1485    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1486    CmiSetHandler(msg, askProcDataHandlerIdx);
1487    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1488    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1489 #else
1490    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1491 #endif
1492 }
1493
1494 // can be called in other files
1495 // return true if it is in restarting
1496 extern "C"
1497 int CkInRestarting()
1498 {
1499 #if CMK_MEM_CHECKPOINT
1500   if (CpvAccess( _crashedNode)!=-1) return 1;
1501   // gzheng
1502   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1503   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1504   return CkMemCheckPT::inRestarting;
1505 #else
1506   return 0;
1507 #endif
1508 }
1509
1510 extern "C"
1511 void CkSetInLdb(){
1512 #if CMK_MEM_CHECKPOINT
1513         CkMemCheckPT::inLoadbalancing = 1;
1514 #endif
1515 }
1516
1517 extern "C"
1518 int CkInLdb(){
1519 #if CMK_MEM_CHECKPOINT
1520         return CkMemCheckPT::inLoadbalancing;
1521 #endif
1522         return 0;
1523 }
1524
1525 extern "C"
1526 void CkResetInLdb(){
1527 #if CMK_MEM_CHECKPOINT
1528         CkMemCheckPT::inLoadbalancing = 0;
1529 #endif
1530 }
1531
1532 /*****************************************************************************
1533                 module initialization
1534 *****************************************************************************/
1535
1536 static int arg_where = CkCheckPoint_inMEM;
1537
1538 #if CMK_MEM_CHECKPOINT
1539 void init_memcheckpt(char **argv)
1540 {
1541     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1542       arg_where = CkCheckPoint_inDISK;
1543     }
1544
1545         // initiliazing _crashedNode variable
1546         CpvInitialize(int, _crashedNode);
1547         CpvAccess(_crashedNode) = -1;
1548
1549 }
1550 #endif
1551
1552 class CkMemCheckPTInit: public Chare {
1553 public:
1554   CkMemCheckPTInit(CkArgMsg *m) {
1555 #if CMK_MEM_CHECKPOINT
1556     if (arg_where == CkCheckPoint_inDISK) {
1557       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1558     }
1559     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1560     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1561 #endif
1562   }
1563 };
1564
1565 static void notifyHandler(char *msg)
1566 {
1567 #if CMK_MEM_CHECKPOINT
1568   CmiFree(msg);
1569       /* immediately increase restart phase to filter old messages */
1570   CpvAccess(_curRestartPhase) ++;
1571   CpvAccess(_qd)->flushStates();
1572   _discard_charm_message();
1573
1574 #endif
1575 }
1576
1577 extern "C"
1578 void notify_crash(int node)
1579 {
1580 #ifdef CMK_MEM_CHECKPOINT
1581   CpvAccess( _crashedNode) = node;
1582 #ifdef CMK_SMP
1583   for(int i=0;i<CkMyNodeSize();i++){
1584         CpvAccessOther(_crashedNode,i)=node;
1585   }
1586 #endif
1587   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1588   CkMemCheckPT::inRestarting = 1;
1589
1590     // this may be in interrupt handler, send a message to reset QD
1591   int pe = CmiNodeFirst(CkMyNode());
1592   for(int i=0;i<CkMyNodeSize();i++){
1593         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1594         CmiSetHandler(msg, notifyHandlerIdx);
1595         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1596   }
1597 #endif
1598 }
1599
1600 extern "C" void (*notify_crash_fn)(int node);
1601
1602 #if CMK_CONVERSE_MPI
1603 static int pingHandlerIdx;
1604 static int pingCheckHandlerIdx;
1605 static int buddyDieHandlerIdx;
1606 static double lastPingTime = -1;
1607
1608 extern "C" void mpi_restart_crashed(int pe, int rank);
1609 extern "C" int  find_spare_mpirank(int pe);
1610
1611 void pingBuddy();
1612 void pingCheckHandler();
1613
1614 void buddyDieHandler(char *msg)
1615 {
1616 #if CMK_MEM_CHECKPOINT
1617    // notify
1618    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1619    notify_crash(diepe);
1620    // send message to crash pe to let it restart
1621    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1622    int newrank = find_spare_mpirank(diepe);
1623    int buddy = obj->BuddyPE(CmiMyPe());
1624    if (buddy == diepe)  {
1625      mpi_restart_crashed(diepe, newrank);
1626      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1627    }
1628 #endif
1629 }
1630
1631 void pingHandler(void *msg)
1632 {
1633   lastPingTime = CmiWallTimer();
1634   CmiFree(msg);
1635 }
1636
1637 void pingCheckHandler()
1638 {
1639 #if CMK_MEM_CHECKPOINT
1640   double now = CmiWallTimer();
1641   if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
1642     int i, pe, buddy;
1643     // tell everyone the buddy dies
1644     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1645     for (i = 1; i < CmiNumPes(); i++) {
1646        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1647        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1648     }
1649     buddy = pe;
1650     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1651     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1652       if (obj->isFailed(pe) || pe == buddy) continue;
1653       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1654       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1655       CmiSetHandler(msg, buddyDieHandlerIdx);
1656       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1657     }*/
1658     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1659     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1660     CmiSetHandler(msg, buddyDieHandlerIdx);
1661     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1662   }
1663   else 
1664     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1665 #endif
1666 }
1667
1668 void pingBuddy()
1669 {
1670 #if CMK_MEM_CHECKPOINT
1671   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1672   if (obj) {
1673     int buddy = obj->BuddyPE(CkMyPe());
1674 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1675     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1676     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1677     CmiSetHandler(msg, pingHandlerIdx);
1678     CmiGetRestartPhase(msg) = 9999;
1679     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1680   }
1681   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1682 #endif
1683 }
1684 #endif
1685
1686 // initproc
1687 void CkRegisterRestartHandler( )
1688 {
1689 #if CMK_MEM_CHECKPOINT
1690   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1691   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1692   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1693   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1694   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1695   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1696   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
1697
1698 #if CMK_CONVERSE_MPI
1699   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1700   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1701   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1702 #endif
1703
1704   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1705   CpvInitialize(CkCheckPTMessage **, chkpBuf);
1706   CpvInitialize(CkCheckPTMessage *, buddyBuf);
1707   CpvInitialize(int,curPointer);
1708   CpvAccess(procChkptBuf) = NULL;
1709   CpvAccess(buddyBuf) = NULL;
1710   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
1711   CpvAccess(chkpBuf)[0] = NULL;
1712   CpvAccess(chkpBuf)[1] = NULL;
1713   CpvAccess(curPointer) = 0;
1714   CpvAccess(recvdLocal) = 0;
1715   CpvAccess(recvdRemote) = 0;
1716
1717   notify_crash_fn = notify_crash;
1718
1719 #if ! CMK_CONVERSE_MPI
1720   // print pid to kill
1721 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1722 //  sleep(4);
1723 #endif
1724 #endif
1725 }
1726
1727
1728 extern "C"
1729 int CkHasCheckpoints()
1730 {
1731   return checkpointed;
1732 }
1733
1734 /// @todo: the following definitions should be moved to a separate file containing
1735 // structures and functions about fault tolerance strategies
1736
1737 /**
1738  *  * @brief: function for killing a process                                             
1739  *   */
1740 #ifdef CMK_MEM_CHECKPOINT
1741 #if CMK_HAS_GETPID
1742 void killLocal(void *_dummy,double curWallTime){
1743         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1744         if(CmiWallTimer()<killTime-1){
1745                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1746         }else{ 
1747 #if CMK_CONVERSE_MPI
1748                                 CkDieNow();
1749 #else 
1750                 kill(getpid(),SIGKILL);                                               
1751 #endif
1752         }              
1753
1754 #else
1755 void killLocal(void *_dummy,double curWallTime){
1756   CmiAbort("kill() not supported!");
1757 }
1758 #endif
1759 #endif
1760
1761 #ifdef CMK_MEM_CHECKPOINT
1762 /**
1763  * @brief: reads the file with the kill information
1764  */
1765 void readKillFile(){
1766         FILE *fp=fopen(killFile,"r");
1767         if(!fp){
1768                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1769                 return;
1770         }
1771         int proc;
1772         double sec;
1773         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1774                 if(proc == CkMyNode() && CkMyRank() == 0){
1775                         killTime = CmiWallTimer()+sec;
1776                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1777                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1778                 }
1779         }
1780         fclose(fp);
1781 }
1782
1783 #if ! CMK_CONVERSE_MPI
1784 void CkDieNow()
1785 {
1786 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1787          // ignored for non-mpi version
1788         CmiPrintf("[%d] die now.\n", CmiMyPe());
1789         killTime = CmiWallTimer()+0.001;
1790         CcdCallFnAfter(killLocal,NULL,1);
1791 #endif
1792 }
1793 #endif
1794
1795 #endif
1796
1797 #include "CkMemCheckpoint.def.h"
1798
1799