support for ampi
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71
72 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
73 #define STREAMING_INFORMHOME                    1
74 CpvDeclare(int, _crashedNode);
75 CpvDeclare(int, _remoteCrashedNode);
76
77 // static, so that it is accessible from Converse part
78 int CkMemCheckPT::inRestarting = 0;
79 int CkMemCheckPT::inCheckpointing = 0;
80 int CkMemCheckPT::replicaAlive = 1;
81 int CkMemCheckPT::inLoadbalancing = 0;
82 double CkMemCheckPT::startTime;
83 char *CkMemCheckPT::stage;
84 CkCallback CkMemCheckPT::cpCallback;
85
86 int _memChkptOn = 1;                    // checkpoint is on or off
87
88 CkGroupID ckCheckPTGroupID;             // readonly
89
90 static int checkpointed = 0;
91
92 /// @todo the following declarations should be moved into a separate file for all 
93 // fault tolerant strategies
94
95 #ifdef CMK_MEM_CHECKPOINT
96 // name of the kill file that contains processes to be killed 
97 char *killFile;                                               
98 // flag for the kill file         
99 int killFlag=0;
100 // variable for storing the killing time
101 double killTime=0.0;
102 #endif
103
104 #ifdef CKLOCMGR_LOOP
105 #undef CKLOCMGR_LOOP
106 #endif
107 // loop over all CkLocMgr and do "code"
108 #define  CKLOCMGR_LOOP(code)    {       \
109   int numGroups = CkpvAccess(_groupIDTable)->size();    \
110   for(int i=0;i<numGroups;i++) {        \
111     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
112     if(obj->isLocMgr())  {      \
113       CkLocMgr *mgr = (CkLocMgr*)obj;   \
114       code      \
115     }   \
116   }     \
117 }
118
119 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
120 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
121 CpvDeclare(CkCheckPTMessage**, chkpBuf);
122 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
123 //store the checkpoint of the buddy to compare
124 //do not need the whole msg, can be the checksum
125 CpvDeclare(CkCheckPTMessage*, buddyBuf);
126 //pointer of the checkpoint going to be written
127 CpvDeclare(int, curPointer);
128 CpvDeclare(int, recvdRemote);
129 CpvDeclare(int, recvdLocal);
130 CpvDeclare(int, localChkpDone);
131 CpvDeclare(int, remoteChkpDone);
132 CpvDeclare(int, recvdArrayChkp);
133 CpvDeclare(int, recvdProcChkp);
134
135 bool compare(char * buf1, char * buf2);
136 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
137 // Converse function handles
138 static int askPhaseHandlerIdx;
139 static int recvPhaseHandlerIdx;
140 static int askProcDataHandlerIdx;
141 static int restartBcastHandlerIdx;
142 static int recoverProcDataHandlerIdx;
143 static int restartBeginHandlerIdx;
144 static int recvRemoteChkpHandlerIdx;
145 static int replicaDieHandlerIdx;
146 static int replicaDieBcastHandlerIdx;
147 static int replicaRecoverHandlerIdx;
148 static int replicaChkpDoneHandlerIdx;
149 static int recoverRemoteProcDataHandlerIdx;
150 static int recoverRemoteArrayDataHandlerIdx;
151 static int notifyHandlerIdx;
152 // compute the backup processor
153 // FIXME: avoid crashed processors
154 #if CMK_CONVERSE_MPI
155 static int pingHandlerIdx;
156 static int pingCheckHandlerIdx;
157 static int buddyDieHandlerIdx;
158 static double lastPingTime = -1;
159
160 extern "C" void mpi_restart_crashed(int pe, int rank);
161 extern "C" int  find_spare_mpirank(int pe,int partition);
162
163 void pingBuddy();
164 void pingCheckHandler();
165
166 #endif
167 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
168
169 inline int CkMemCheckPT::BuddyPE(int pe)
170 {
171   int budpe;
172 #if NODE_CHECKPOINT
173   // buddy is the processor with same rank on the next physical node
174   int r1 = CmiPhysicalRank(pe);
175   int budnode = CmiPhysicalNodeID(pe);
176   do {
177     budnode = (budnode+1)%CmiNumPhysicalNodes();
178     int *pelist;
179     int num;
180     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
181     budpe = pelist[r1 % num];
182   } while (isFailed(budpe));
183   if (budpe == pe) {
184     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
185     CmiAbort("Failed to find a buddy processor");
186   }
187 #else
188   budpe = pe;
189   budpe = (budpe+1)%CkNumPes();
190 #endif
191   return budpe;
192 }
193
194 // called in array element constructor
195 // choose and register with 2 buddies for checkpoiting 
196 #if CMK_MEM_CHECKPOINT
197 void ArrayElement::init_checkpt() {
198   if (_memChkptOn == 0) return;
199   if (CkInRestarting()) {
200     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
201   }
202   // only master init checkpoint
203   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
204
205   budPEs[0] = CkMyPe();
206   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
207   CmiAssert(budPEs[0] != budPEs[1]);
208   // inform checkPTMgr
209   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
210   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
211   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
212   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
213 }
214 #endif
215
216 // entry function invoked by checkpoint mgr asking for checkpoint data
217 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
218 #if CMK_MEM_CHECKPOINT
219   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
220   //char index[128];   thisIndexMax.sprint(index);
221   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
222   CkLocMgr *locMgr = thisArray->getLocMgr();
223   CmiAssert(myRec!=NULL);
224   int size;
225   {
226     PUP::sizer p;
227     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
228     size = p.size();
229   }
230   int packSize = size/sizeof(double) +1;
231   CkArrayCheckPTMessage *msg =
232     new (packSize, 0) CkArrayCheckPTMessage;
233   msg->len = size;
234   msg->index =thisIndexMax;
235   msg->aid = thisArrayID;
236   msg->locMgr = locMgr->getGroupID();
237   msg->cp_flag = 1;
238   {
239     PUP::toMem p(msg->packData);
240     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
241   }
242
243   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
244   checkptMgr.recvData(msg, 2, budPEs);
245   delete m;
246 #endif
247 }
248
249 // checkpoint holder class - for memory checkpointing
250 class CkMemCheckPTInfo: public CkCheckPTInfo
251 {
252   CkArrayCheckPTMessage *ckBuffer;
253   public:
254   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
255     CkCheckPTInfo(a, loc, idx, pno)
256   {
257     ckBuffer = NULL;
258   }
259   ~CkMemCheckPTInfo() 
260   {
261     if (ckBuffer) delete ckBuffer; 
262   }
263   inline void updateBuffer(CkArrayCheckPTMessage *data) 
264   {
265     CmiAssert(data!=NULL);
266     if (ckBuffer) delete ckBuffer;
267     ckBuffer = data;
268   }    
269   inline CkArrayCheckPTMessage * getCopy()
270   {
271     if (ckBuffer == NULL) {
272       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
273       CmiAbort("Abort!");
274     }
275     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
276   }     
277   inline void updateBuddy(int b1, int b2) {
278     CmiAssert(ckBuffer);
279     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
280     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
281     CmiAssert(pNo != CkMyPe());
282   }
283   inline int getSize() { 
284     CmiAssert(ckBuffer);
285     return ckBuffer->len; 
286   }
287 };
288
289 // checkpoint holder class - for in-disk checkpointing
290 class CkDiskCheckPTInfo: public CkCheckPTInfo 
291 {
292   char *fname;
293   int bud1, bud2;
294   int len;                      // checkpoint size
295   public:
296   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
297   {
298 #if CMK_USE_MKSTEMP
299     fname = new char[64];
300     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
301     mkstemp(fname);
302 #else
303     fname=tmpnam(NULL);
304 #endif
305     bud1 = bud2 = -1;
306     len = 0;
307   }
308   ~CkDiskCheckPTInfo() 
309   {
310     remove(fname);
311   }
312   inline void updateBuffer(CkArrayCheckPTMessage *data) 
313   {
314     double t = CmiWallTimer();
315     // unpack it
316     envelope *env = UsrToEnv(data);
317     CkUnpackMessage(&env);
318     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
319     FILE *f = fopen(fname,"wb");
320     PUP::toDisk p(f);
321     CkPupMessage(p, (void **)&data);
322     // delay sync to the end because otherwise the messages are blocked
323     //    fsync(fileno(f));
324     fclose(f);
325     bud1 = data->bud1;
326     bud2 = data->bud2;
327     len = data->len;
328     delete data;
329     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
330   }
331   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
332   {
333     CkArrayCheckPTMessage *data;
334     FILE *f = fopen(fname,"rb");
335     PUP::fromDisk p(f);
336     CkPupMessage(p, (void **)&data);
337     fclose(f);
338     data->bud1 = bud1;                          // update the buddies
339     data->bud2 = bud2;
340     return data;
341   }
342   inline void updateBuddy(int b1, int b2) {
343     bud1 = b1; bud2 = b2;
344     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
345     CmiAssert(pNo != CkMyPe());
346   }
347   inline int getSize() { 
348     return len; 
349   }
350 };
351
352 CkMemCheckPT::CkMemCheckPT(int w)
353 {
354   int numnodes = 0;
355 #if NODE_CHECKPOINT
356   numnodes = CmiNumPhysicalNodes();
357 #else
358   numnodes = CkNumPes();
359 #endif
360 #if CK_NO_PROC_POOL
361   if (numnodes <= 2)
362 #else
363     if (numnodes  == 1)
364 #endif
365     {
366       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
367       _memChkptOn = 0;
368     }
369   inRestarting = 0;
370   inCheckpointing = 0;
371   recvCount = peCount = 0;
372   ackCount = 0;
373   expectCount = -1;
374   where = w;
375   replicaAlive = 1;
376   notifyReplica = 0;
377 #if CMK_CONVERSE_MPI
378   void pingBuddy();
379   void pingCheckHandler();
380   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
381   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
382 #endif
383   chkpTable[0] = NULL;
384   chkpTable[1] = NULL;
385   maxIter = -1;
386   recvIterCount = 0;
387   localDecided = false;
388 }
389
390 CkMemCheckPT::~CkMemCheckPT()
391 {
392   int len = ckTable.length();
393   for (int i=0; i<len; i++) {
394     delete ckTable[i];
395   }
396 }
397
398 void CkMemCheckPT::pup(PUP::er& p) 
399
400   CBase_CkMemCheckPT::pup(p); 
401   p|cpStarter;
402   p|thisFailedPe;
403   p|failedPes;
404   p|ckCheckPTGroupID;           // recover global variable
405   p|cpCallback;                 // store callback
406   p|where;                      // where to checkpoint
407   p|peCount;
408   if (p.isUnpacking()) {
409     recvCount = 0;
410 #if CMK_CONVERSE_MPI
411     void pingBuddy();
412     void pingCheckHandler();
413     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
414     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
415 #endif
416     maxIter = -1;
417     recvIterCount = 0;
418     localDecided = false;
419   }
420 }
421
422 void CkMemCheckPT::getIter(){
423   localDecided = true;
424   localMaxIter = maxIter+1;
425   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
426   int elemCount = CkCountChkpSyncElements();
427   if(elemCount == 0){
428     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
429   }
430 }
431
432 //when one replica failes, decide the next chkp iter
433
434 void CkMemCheckPT::recvIter(int iter){
435   if(maxIter<iter){
436     maxIter=iter;
437   }
438 }
439
440 void CkMemCheckPT::recvMaxIter(int iter){
441   localDecided = false;
442   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
443 }
444
445 void CkMemCheckPT::reachChkpIter(){
446   recvIterCount++;
447   elemCount = CkCountChkpSyncElements();
448   if(recvIterCount == elemCount){
449     recvIterCount = 0;
450     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
451   }
452 }
453
454 void CkMemCheckPT::startChkp(){
455   CkPrintf("start checkpoint\n");
456   CkStartMemCheckpoint(cpCallback);
457 }
458
459 // called by checkpoint mgr to restore an array element
460 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
461 {
462 #if CMK_MEM_CHECKPOINT
463   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
464   // m->index.print();
465   PUP::fromMem p(m->packData);
466   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
467   CmiAssert(mgr);
468 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
469   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
470 #else
471   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
472 #endif
473
474   // find a list of array elements bound together
475   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
476   CmiAssert(elt);
477   CkLocRec_local *rec = elt->myRec;
478   CkVec<CkMigratable *> list;
479   mgr->migratableList(rec, list);
480   CmiAssert(list.length() > 0);
481   for (int l=0; l<list.length(); l++) {
482     elt = (ArrayElement *)list[l];
483     elt->budPEs[0] = m->bud1;
484     elt->budPEs[1] = m->bud2;
485     //    reset, may not needed now
486     // for now.
487     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
488       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
489       if (c) c->redNo = 0;
490     }
491   }
492 #endif
493 }
494
495 // return 1 if pe is a crashed processor
496 int CkMemCheckPT::isFailed(int pe)
497 {
498   for (int i=0; i<failedPes.length(); i++)
499     if (failedPes[i] == pe) return 1;
500   return 0;
501 }
502
503 // add pe into history list of all failed processors
504 void CkMemCheckPT::failed(int pe)
505 {
506   if (isFailed(pe)) return;
507   failedPes.push_back(pe);
508 }
509
510 int CkMemCheckPT::totalFailed()
511 {
512   return failedPes.length();
513 }
514
515 // create an checkpoint entry for array element of aid with index.
516 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
517 {
518   // error check, no duplicate
519   int idx, len = ckTable.size();
520   for (idx=0; idx<len; idx++) {
521     CkCheckPTInfo *entry = ckTable[idx];
522     if (index == entry->index) {
523       if (loc == entry->locMgr) {
524         // bindTo array elements
525         return;
526       }
527       // for array inheritance, the following check may fail
528       // because ArrayElement constructor of all superclasses are called
529       if (aid == entry->aid) {
530         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
531         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
532       }
533     }
534   }
535   CkCheckPTInfo *newEntry;
536   if (where == CkCheckPoint_inMEM)
537     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
538   else
539     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
540   ckTable.push_back(newEntry);
541   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
542 }
543
544 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
545 {
546 #if !CMK_CHKP_ALL       
547   int buddy = msg->bud1;
548   if (buddy == CkMyPe()) buddy = msg->bud2;
549   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
550   recvData(msg);
551   // ack
552   thisProxy[buddy].gotData();
553 #else
554   chkpTable[0] = NULL;
555   chkpTable[1] = NULL;
556   recvArrayCheckpoint(msg);
557   thisProxy[msg->bud2].gotData();
558 #endif
559 }
560
561 // loop through my checkpoint table and ask checkpointed array elements
562 // to send me checkpoint data.
563 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
564 {
565   checkpointed = 1;
566   cpCallback = cb;
567   cpStarter = starter;
568   inCheckpointing = 1;
569   if (CkMyPe() == cpStarter) {
570     startTime = CmiWallTimer();
571     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
572   }
573 #if CMK_CONVERSE_MPI
574   if(CmiNumPartition()==1)
575 #endif    
576   {
577
578 #if !CMK_CHKP_ALL
579     int len = ckTable.length();
580     for (int i=0; i<len; i++) {
581       CkCheckPTInfo *entry = ckTable[i];
582       // always let the bigger number processor send request
583       //if (CkMyPe() < entry->pNo) continue;
584       // always let the smaller number processor send request, may on same proc
585       if (!isMaster(entry->pNo)) continue;
586       // call inmem_checkpoint to the array element, ask it to send
587       // back checkpoint data via recvData().
588       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
589       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
590     }
591     // if my table is empty, then I am done
592     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
593 #else
594     startArrayCheckpoint();
595 #endif
596     sendProcData();
597   }
598 #if CMK_CONVERSE_MPI
599   else
600   {
601     startCheckpoint();
602   }
603 #endif
604   // pack and send proc level data
605 }
606
607 class MemElementPacker : public CkLocIterator{
608   private:
609 //    CkLocMgr *locMgr;
610     PUP::er &p;
611         std::map<CkHashCode,CkLocation> arrayMap;
612   public:
613     MemElementPacker(PUP::er &p_):p(p_){};
614     void addLocation(CkLocation &loc){
615       CkArrayIndexMax idx = loc.getIndex();
616       arrayMap[idx.hash()] = loc;
617     }
618         void writeCheckpoint(){
619                 std::map<CkHashCode, CkLocation>::iterator it;
620                 for(it = arrayMap.begin();it!=arrayMap.end();it++){
621                         if(CkMyPe()==0){
622 //                              CkPrintf("[%d][%d] pack %d\n",CmiMyPartition(),CkMyPe(),it->first);
623                         }
624                         CkLocation loc = it->second;
625                 CkLocMgr *locMgr = loc.getManager();
626                 CkArrayIndexMax idx = loc.getIndex();
627                         CkGroupID gID = locMgr->ckGetGroupID();
628                         p|gID;
629                 p|idx;
630                 locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
631                 }
632         }
633 };
634
635 void pupAllElements(PUP::er &p){
636 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
637   int numElements;
638   if(!p.isUnpacking()){
639     numElements = CkCountArrayElements();
640   }
641   p | numElements;
642   if(!p.isUnpacking()){
643           MemElementPacker packer(p);
644       CKLOCMGR_LOOP(mgr->iterate(packer););
645           packer.writeCheckpoint();
646   }
647 #endif
648 }
649
650 void CkMemCheckPT::startArrayCheckpoint(){
651 #if CMK_CHKP_ALL
652   int size;
653   {
654     PUP::sizer psizer;
655     pupAllElements(psizer);
656     size = psizer.size();
657   }
658   int packSize = size/sizeof(double)+1;
659   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
660   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
661   msg->len = size;
662   msg->cp_flag = 1;
663   int budPEs[2];
664   msg->bud1=CkMyPe();
665   msg->bud2=ChkptOnPe(CkMyPe());
666   {
667     PUP::toMem p(msg->packData);
668     pupAllElements(p);
669   }
670   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
671   if(chkpTable[0]) delete chkpTable[0];
672   chkpTable[0] = msg;
673   //send the checkpoint to my 
674   recvCount++;
675 #endif
676 }
677
678 void CkMemCheckPT::startCheckpoint(){
679 #if CMK_CONVERSE_MPI
680   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
681     CkPrintf("in start checkpointing!!!!\n");
682   int size;
683   {
684     PUP::sizer p;
685     _handleProcData(p,CmiFalse);
686     size = p.size();
687   }
688   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
689   procMsg->len = size;
690   procMsg->cp_flag = 1;
691   {
692     PUP::toMem p(procMsg->packData);
693     _handleProcData(p,CmiFalse);
694   }
695   int pointer = CpvAccess(curPointer);
696   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
697   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
698
699   {
700     PUP::sizer psizer;
701     pupAllElements(psizer);
702     size = psizer.size();
703   }
704   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
705   msg->len = size;
706   msg->cp_flag = 1;
707   {
708     PUP::toMem p(msg->packData);
709     pupAllElements(p);
710   }
711   pointer = CpvAccess(curPointer);
712   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
713   CpvAccess(chkpBuf)[pointer] = msg;
714   if(CkMyPe()==0)
715     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
716   if(CkReplicaAlive()==1){
717     CpvAccess(recvdLocal) = 1;
718     envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
719     CkPackMessage(&env);
720     CmiSetHandler(env,recvRemoteChkpHandlerIdx);
721     CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
722   }
723   if(CpvAccess(recvdRemote)==1){
724     //compare the checkpoint 
725     int size = CpvAccess(chkpBuf)[pointer]->len;
726     CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
727     if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
728       thisProxy[CkMyPe()].doneComparison(true);
729     }else{
730       CkPrintf("[%d][%d] failed the test my size %d buddy size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,CpvAccess(buddyBuf)->len,pointer);
731       thisProxy[CkMyPe()].doneComparison(false);
732     }
733     if(CkMyPe()==0)
734       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
735   }
736   else{
737     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
738       { 
739         int pointer = CpvAccess(curPointer);
740         //send the proc data
741         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
742         procMsg->pointer = pointer;
743         envelope * env = (envelope *)(UsrToEnv(procMsg));
744         CkPackMessage(&env);
745         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
746         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
747         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
748           CkPrintf("[%d] sendProcdata\n",CkMyPe());
749         }
750       }
751       //send the array checkpoint data
752       CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
753       msg->pointer = CpvAccess(curPointer);
754       envelope * env = (envelope *)(UsrToEnv(msg));
755       CkPackMessage(&env);
756       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
757       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
758       if(CkMyPe() == CpvAccess(_remoteCrashedNode))
759         CkPrintf("[%d] sendArraydata\n",CkMyPe());
760       //can continue work, no need to wait for my replica
761     }
762   }
763 #endif
764 }
765
766 void CkMemCheckPT::doneComparison(bool ret){
767   int _ret = 1;
768   if(!ret){
769     CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
770     _ret = 0;
771   }else{
772     _ret = 1;
773   }
774   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
775   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
776 }
777
778 void CkMemCheckPT::doneRComparison(int ret){
779   //    if(CpvAccess(curPointer) == 0){
780   if(ret==CkNumPes()){
781     CpvAccess(localChkpDone) = 1;
782     if(CpvAccess(remoteChkpDone) ==1){
783       thisProxy.doneBothComparison();
784     }
785     if(notifyReplica == 0){
786       //notify the replica am done
787       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
788       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
789       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
790       notifyReplica = 1;
791     }
792   }
793   else{
794     CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
795     thisProxy.RollBack();
796   }
797 }
798
799 void CkMemCheckPT::doneBothComparison(){
800   CpvAccess(recvdRemote) = 0;
801   CpvAccess(recvdLocal) = 0;
802   CpvAccess(localChkpDone) = 0;
803   CpvAccess(remoteChkpDone) = 0;
804   CpvAccess(curPointer)^=1;
805   inCheckpointing = 0;
806   notifyReplica = 0;
807   if(CkMyPe() == 0){
808     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
809   }
810   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
811 }
812
813 void CkMemCheckPT::RollBack(){
814   //restore group data
815   checkpointed = 0;
816   CkMemCheckPT::inRestarting = 1;
817   int pointer = CpvAccess(curPointer)^1;//use the previous one
818   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
819   PUP::fromMem p(chkpMsg->packData);    
820
821   //destroy array elements
822   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
823   int numGroups = CkpvAccess(_groupIDTable)->size();
824   for(int i=0;i<numGroups;i++) {
825     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
826     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
827     obj->flushStates();
828     obj->ckJustMigrated();
829   }
830   //restore array elements
831
832   int numElements;
833   p|numElements;
834
835   if(p.isUnpacking()){
836     for(int i=0;i<numElements;i++){
837       //for(int i=0;i<1;i++){
838       CkGroupID gID;
839       CkArrayIndex idx;
840       p|gID;
841       p|idx;
842       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
843       CmiAssert(mgr);
844       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
845     }
846     }
847     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
848     contribute(cb);
849   }
850
851   void CkMemCheckPT::notifyReplicaDie(int pe){
852     //CkPrintf("[%d] receive replica die\n",CkMyPe());
853     replicaAlive = 0;
854     CpvAccess(_remoteCrashedNode) = pe;
855   }
856
857   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
858   {
859 #if CMK_CHKP_ALL
860     int idx = 1;
861     if(msg->bud1 == CkMyPe()){
862       idx = 0;
863     }
864     int isChkpting = msg->cp_flag;
865     if(isChkpting == 1){
866       if(chkpTable[idx]) delete chkpTable[idx];
867     }
868     chkpTable[idx] = msg;
869     if(isChkpting){
870       recvCount++;
871       if(recvCount == 2){
872         if (where == CkCheckPoint_inMEM) {
873           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
874         }
875         else if (where == CkCheckPoint_inDISK) {
876           // another barrier for finalize the writing using fsync
877           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
878           contribute(0,NULL,CkReduction::sum_int,localcb);
879         }
880         else
881           CmiAbort("Unknown checkpoint scheme");
882         recvCount = 0;
883       }
884     }
885 #endif
886   }
887
888   // don't handle array elements
889   static inline void _handleProcData(PUP::er &p, CmiBool create)
890   {
891     // save readonlys, and callback BTW
892     CkPupROData(p);
893
894     // save mainchares 
895     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
896
897 #ifndef CMK_CHARE_USE_PTR
898     // save non-migratable chare
899     CkPupChareData(p);
900 #endif
901
902     // save groups into Groups.dat
903     CkPupGroupData(p,create);
904
905     // save nodegroups into NodeGroups.dat
906     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
907   }
908
909   void CkMemCheckPT::sendProcData()
910   {
911     // find out size of buffer
912     int size;
913     {
914       PUP::sizer p;
915       _handleProcData(p,CmiTrue);
916       size = p.size();
917     }
918     int packSize = size;
919     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
920     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
921     {
922       PUP::toMem p(msg->packData);
923       _handleProcData(p,CmiTrue);
924     }
925     msg->pe = CkMyPe();
926     msg->len = size;
927     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
928     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
929   }
930
931   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
932   {
933     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
934     CpvAccess(procChkptBuf) = msg;
935     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
936     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
937   }
938
939   // ArrayElement call this function to give us the checkpointed data
940   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
941   {
942     int len = ckTable.length();
943     int idx;
944     for (idx=0; idx<len; idx++) {
945       CkCheckPTInfo *entry = ckTable[idx];
946       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
947     }
948     CkAssert(idx < len);
949     int isChkpting = msg->cp_flag;
950     ckTable[idx]->updateBuffer(msg);
951     if (isChkpting) {
952       // all my array elements have returned their inmem data
953       // inform starter processor that I am done.
954       recvCount ++;
955       if (recvCount == ckTable.length()) {
956         if (where == CkCheckPoint_inMEM) {
957           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
958         }
959         else if (where == CkCheckPoint_inDISK) {
960           // another barrier for finalize the writing using fsync
961           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
962           contribute(0,NULL,CkReduction::sum_int,localcb);
963         }
964         else
965           CmiAbort("Unknown checkpoint scheme");
966         recvCount = 0;
967       } 
968     }
969   }
970
971   // only used in disk checkpointing
972   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
973   {
974     delete m;
975 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
976     system("sync");
977 #endif
978     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
979   }
980
981   // only is called on cpStarter when checkpoint is done
982   void CkMemCheckPT::cpFinish()
983   {
984     CmiAssert(CkMyPe() == cpStarter);
985     peCount++;
986     // now that all processors have finished, activate callback
987     if (peCount == 2) 
988     {
989       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
990       peCount = 0;
991       thisProxy.report();
992     }
993   }
994
995   // for debugging, report checkpoint info
996   void CkMemCheckPT::report()
997   {
998     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
999     inCheckpointing = 0;
1000 #if !CMK_CHKP_ALL
1001     int objsize = 0;
1002     int len = ckTable.length();
1003     for (int i=0; i<len; i++) {
1004       CkCheckPTInfo *entry = ckTable[i];
1005       CmiAssert(entry);
1006       objsize += entry->getSize();
1007     }
1008     CmiAssert(CpvAccess(procChkptBuf));
1009     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1010 #else
1011     if(CkMyPe()==0)
1012       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1013 #endif
1014   }
1015
1016   /*****************************************************************************
1017     RESTART Procedure
1018    *****************************************************************************/
1019
1020   // master processor of two buddies
1021   inline int CkMemCheckPT::isMaster(int buddype)
1022   {
1023 #if 0
1024     int mype = CkMyPe();
1025     //CkPrintf("ismaster: %d %d\n", pe, mype);
1026     if (CkNumPes() - totalFailed() == 2) {
1027       return mype > buddype;
1028     }
1029     for (int i=1; i<CkNumPes(); i++) {
1030       int me = (buddype+i)%CkNumPes();
1031       if (isFailed(me)) continue;
1032       if (me == mype) return 1;
1033       else return 0;
1034     }
1035     return 0;
1036 #else
1037     // smaller one
1038     int mype = CkMyPe();
1039     //CkPrintf("ismaster: %d %d\n", pe, mype);
1040     if (CkNumPes() - totalFailed() == 2) {
1041       return mype < buddype;
1042     }
1043 #if NODE_CHECKPOINT
1044     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1045     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1046 #else
1047       for (int i=1; i<CkNumPes(); i++) {
1048 #endif
1049         int me = (mype+i)%CkNumPes();
1050         if (isFailed(me)) continue;
1051         if (me == buddype) return 1;
1052         else return 0;
1053       }
1054       return 0;
1055 #endif
1056     }
1057
1058
1059
1060 #if 0
1061     // helper class to pup all elements that belong to same ckLocMgr
1062     class ElementDestoryer : public CkLocIterator {
1063       private:
1064         CkLocMgr *locMgr;
1065       public:
1066         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1067         void addLocation(CkLocation &loc) {
1068           CkArrayIndex idx=loc.getIndex();
1069           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1070           loc.destroy();
1071         }
1072     };
1073 #endif
1074
1075     // restore the bitmap vector for LB
1076     void CkMemCheckPT::resetLB(int diepe)
1077     {
1078 #if CMK_LBDB_ON
1079       int i;
1080       char *bitmap = new char[CkNumPes()];
1081       // set processor available bitmap
1082       get_avail_vector(bitmap);
1083       for (i=0; i<failedPes.length(); i++)
1084         bitmap[failedPes[i]] = 0; 
1085       bitmap[diepe] = 0;
1086
1087 #if CK_NO_PROC_POOL
1088       set_avail_vector(bitmap);
1089 #endif
1090
1091       // if I am the crashed pe, rebuild my failedPEs array
1092       if (CkMyNode() == diepe)
1093         for (i=0; i<CkNumPes(); i++) 
1094           if (bitmap[i]==0) failed(i);
1095
1096       delete [] bitmap;
1097 #endif
1098     }
1099
1100     static double restartT;
1101
1102     // in case when failedPe dies, everybody go through its checkpoint table:
1103     // destory all array elements
1104     // recover lost buddies
1105     // reconstruct all array elements from check point data
1106     // called on all processors
1107     void CkMemCheckPT::restart(int diePe)
1108     {
1109 #if CMK_MEM_CHECKPOINT
1110       double curTime = CmiWallTimer();
1111       if (CkMyPe() == diePe){
1112         restartT = CmiWallTimer();
1113         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1114       }
1115       stage = (char*)"resetLB";
1116       startTime = curTime;
1117       if (CkMyPe() == diePe)
1118         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1119
1120 #if CK_NO_PROC_POOL
1121       failed(diePe);    // add into the list of failed pes
1122 #endif
1123       thisFailedPe = diePe;
1124
1125       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1126
1127       inRestarting = 1;
1128
1129       // disable load balancer's barrier
1130       //if (CkMyPe() != diePe) resetLB(diePe);
1131
1132       CKLOCMGR_LOOP(mgr->startInserting(););
1133
1134
1135       if(CmiNumPartition()==1){
1136         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1137       }else{
1138         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1139         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1140       }
1141 #endif
1142     }
1143
1144     // loally remove all array elements
1145     void CkMemCheckPT::removeArrayElements()
1146     {
1147 #if CMK_MEM_CHECKPOINT
1148       int len = ckTable.length();
1149       double curTime = CmiWallTimer();
1150       if (CkMyPe() == thisFailedPe) 
1151         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1152       stage = (char*)"removeArrayElements";
1153       startTime = curTime;
1154
1155       //  if (cpCallback.isInvalid()) 
1156       //          CkPrintf("invalid pe %d\n",CkMyPe());
1157       //          CkAbort("Didn't set restart callback\n");;
1158       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1159
1160       // get rid of all buffering and remote recs
1161       // including destorying all array elements
1162 #if CK_NO_PROC_POOL  
1163       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1164 #else
1165       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1166 #endif
1167       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1168 #endif
1169     }
1170
1171     // flush state in reduction manager
1172     void CkMemCheckPT::resetReductionMgr()
1173     {
1174       if (CkMyPe() == thisFailedPe) 
1175         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1176       int numGroups = CkpvAccess(_groupIDTable)->size();
1177       for(int i=0;i<numGroups;i++) {
1178         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1179         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1180         obj->flushStates();
1181         obj->ckJustMigrated();
1182       }
1183       // reset again
1184       //CpvAccess(_qd)->flushStates();
1185       if(CmiNumPartition()==1){
1186         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1187       }
1188       else
1189         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1190     }
1191
1192     // recover the lost buddies
1193     void CkMemCheckPT::recoverBuddies()
1194     {
1195       int idx;
1196       int len = ckTable.length();
1197       // ready to flush reduction manager
1198       // cannot be CkMemCheckPT::restart because destory will modify states
1199       double curTime = CmiWallTimer();
1200       if (CkMyPe() == thisFailedPe)
1201         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1202       stage = (char *)"recoverBuddies";
1203       if (CkMyPe() == thisFailedPe)
1204         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1205       startTime = curTime;
1206
1207       // recover buddies
1208       expectCount = 0;
1209 #if !CMK_CHKP_ALL
1210       for (idx=0; idx<len; idx++) {
1211         CkCheckPTInfo *entry = ckTable[idx];
1212         if (entry->pNo == thisFailedPe) {
1213 #if CK_NO_PROC_POOL
1214           // find a new buddy
1215           int budPe = BuddyPE(CkMyPe());
1216 #else
1217           int budPe = thisFailedPe;
1218 #endif
1219           CkArrayCheckPTMessage *msg = entry->getCopy();
1220           msg->bud1 = budPe;
1221           msg->bud2 = CkMyPe();
1222           msg->cp_flag = 0;            // not checkpointing
1223           thisProxy[budPe].recoverEntry(msg);
1224           expectCount ++;
1225         }
1226       }
1227 #else
1228       //send to failed pe
1229       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1230 #if CK_NO_PROC_POOL
1231         // find a new buddy
1232         int budPe = BuddyPE(CkMyPe());
1233 #else
1234         int budPe = thisFailedPe;
1235 #endif
1236         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1237         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1238         msg->cp_flag = 0;            // not checkpointing
1239         msg->bud1 = budPe;
1240         msg->bud2 = CkMyPe();
1241         thisProxy[budPe].recoverEntry(msg);
1242         expectCount ++;
1243       }
1244 #endif
1245
1246       if (expectCount == 0) {
1247         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1248       }
1249       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1250     }
1251
1252     void CkMemCheckPT::gotData()
1253     {
1254       ackCount ++;
1255       if (ackCount == expectCount) {
1256         ackCount = 0;
1257         expectCount = -1;
1258         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1259         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1260       }
1261     }
1262
1263     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1264     {
1265
1266       for (int i=0; i<n; i++) {
1267         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1268         mgr->updateLocation(idx[i], nowOnPe);
1269       }
1270       thisProxy[nowOnPe].gotReply();
1271     }
1272
1273     // restore array elements
1274     void CkMemCheckPT::recoverArrayElements()
1275     {
1276       if(CmiMyPartition()==1&&CkMyPe()==0){
1277         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1278         fflush(stdout);
1279       }
1280       double curTime = CmiWallTimer();
1281       int len = ckTable.length();
1282       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1283       stage = (char *)"recoverArrayElements";
1284       if (CkMyPe() == thisFailedPe)
1285         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1286       startTime = curTime;
1287       int flag = 0;
1288       // recover all array elements
1289       int count = 0;
1290
1291 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1292       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1293       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1294 #endif
1295
1296 #if !CMK_CHKP_ALL
1297       for (int idx=0; idx<len; idx++)
1298       {
1299         CkCheckPTInfo *entry = ckTable[idx];
1300 #if CK_NO_PROC_POOL
1301         // the bigger one will do 
1302         //    if (CkMyPe() < entry->pNo) continue;
1303         if (!isMaster(entry->pNo)) continue;
1304 #else
1305         // smaller one do it, which has the original object
1306         if (CkMyPe() == entry->pNo+1 || 
1307             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1308 #endif
1309         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1310
1311         entry->updateBuddy(CkMyPe(), entry->pNo);
1312         CkArrayCheckPTMessage *msg = entry->getCopy();
1313         // gzheng
1314         //thisProxy[CkMyPe()].inmem_restore(msg);
1315         inmem_restore(msg);
1316 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1317         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1318         int homePe = mgr->homePe(msg->index);
1319         if (homePe != CkMyPe()) {
1320           gmap[homePe].push_back(msg->locMgr);
1321           imap[homePe].push_back(msg->index);
1322         }
1323 #endif
1324         CkFreeMsg(msg);
1325         count ++;
1326       }
1327 #else
1328       char * packData;
1329       if(CmiNumPartition()==1){
1330         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1331         packData = (char *)msg->packData;
1332       }
1333       else{
1334         int pointer = CpvAccess(curPointer);
1335         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1336         packData = msg->packData;
1337       }
1338 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1339       recoverAll(packData,gmap,imap);
1340 #else
1341       recoverAll(packData);
1342 #endif
1343 #endif
1344       curTime = CmiWallTimer();
1345 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1346       for (int i=0; i<CkNumPes(); i++) {
1347         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1348           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1349           flag++;       
1350         }
1351       }
1352       delete [] imap;
1353       delete [] gmap;
1354 #endif
1355       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1356
1357       CKLOCMGR_LOOP(mgr->doneInserting(););
1358
1359       // _crashedNode = -1;
1360       CpvAccess(_crashedNode) = -1;
1361 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1362       if (CkMyPe() == 0)
1363         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1364 #else
1365       if(flag == 0)
1366       {
1367         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1368       }
1369 #endif
1370       if(CmiMyPartition()==1&&CkMyPe()==0){
1371         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1372         fflush(stdout);
1373       }
1374     }
1375
1376     void CkMemCheckPT::gotReply(){
1377       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1378     }
1379
1380     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1381       if(CmiMyPartition()==1&&CkMyPe()==0){
1382         CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1383         fflush(stdout);
1384       }
1385 #if CMK_CHKP_ALL
1386       PUP::fromMem p(packData);
1387       int numElements;
1388       p|numElements;
1389       if(p.isUnpacking()){
1390         for(int i=0;i<numElements;i++){
1391           CkGroupID gID;
1392           CkArrayIndex idx;
1393           p|gID;
1394           p|idx;
1395           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1396           int homePe = mgr->homePe(idx);
1397 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1398           mgr->resume(idx,p,CmiTrue,CmiTrue);
1399 #else
1400           if(CmiNumPartition()==1)
1401             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1402           else{
1403             if(CkMyPe()==thisFailedPe){
1404               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1405             }
1406             else{
1407               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1408             }
1409           }
1410 #endif
1411 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1412           homePe = mgr->homePe(idx);
1413           if (homePe != CkMyPe()) {
1414             gmap[homePe].push_back(gID);
1415             imap[homePe].push_back(idx);
1416           }
1417 #endif
1418         }
1419       }
1420 #endif
1421       if(CmiMyPartition()==1&&CkMyPe()==0){
1422         CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1423         fflush(stdout);
1424       }
1425     }
1426
1427
1428     // on every processor
1429     // turn load balancer back on
1430     void CkMemCheckPT::finishUp()
1431     {
1432       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1433       //CKLOCMGR_LOOP(mgr->doneInserting(););
1434
1435       if (CkMyPe() == thisFailedPe)
1436       {
1437         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1438         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1439         fflush(stdout);
1440       }
1441       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1442       inRestarting = 0;
1443
1444 #if CMK_CONVERSE_MPI    
1445       if(CmiNumPartition()!=1){
1446         CpvAccess(recvdProcChkp) = 0;
1447         CpvAccess(recvdArrayChkp) = 0;
1448         CpvAccess(curPointer)^=1;
1449         //notify my replica, restart is done
1450         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1451         CmiSetHandler(msg,replicaRecoverHandlerIdx);
1452         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1453       }
1454       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1455         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1456       }
1457 #endif
1458
1459 #if CK_NO_PROC_POOL
1460 #if NODE_CHECKPOINT
1461       int numnodes = CmiNumPhysicalNodes();
1462 #else
1463       int numnodes = CkNumPes();
1464 #endif
1465       if (numnodes-totalFailed() <=2) {
1466         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1467         _memChkptOn = 0;
1468       }
1469 #endif
1470     }
1471
1472     void CkMemCheckPT::recoverFromSoftFailure()
1473     {
1474       inRestarting = 0;
1475           CpvAccess(recvdRemote) = 0;
1476           CpvAccess(recvdLocal) = 0;
1477           CpvAccess(localChkpDone) = 0;
1478           CpvAccess(remoteChkpDone) = 0;
1479           inCheckpointing = 0;
1480           notifyReplica = 0;
1481           if(CkMyPe() == 0){
1482                 CmiPrintf("[%d][%d] Recover From soft failures, sending callback ... \n", CmiMyPartition(),CkMyPe());
1483           }
1484       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1485     }
1486     // called only on 0
1487     void CkMemCheckPT::quiescence(CkCallback &cb)
1488     {
1489       static int pe_count = 0;
1490       pe_count ++;
1491       CmiAssert(CkMyPe() == 0);
1492       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1493       if (pe_count == CkNumPes()) {
1494         pe_count = 0;
1495         cb.send();
1496       }
1497     }
1498
1499     // User callable function - to start a checkpoint
1500     // callback cb is used to pass control back
1501     void CkStartMemCheckpoint(CkCallback &cb)
1502     {
1503 #if CMK_MEM_CHECKPOINT
1504       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1505       /*if (_memChkptOn == 0) {
1506         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1507         cb.send();
1508         return;
1509       }*/
1510       if (CkInRestarting()) {
1511         // trying to checkpointing during restart
1512         cb.send();
1513         return;
1514       }
1515       // store user callback and user data
1516       CkMemCheckPT::cpCallback = cb;
1517
1518       // broadcast to start check pointing
1519       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1520       checkptMgr.doItNow(CkMyPe(), cb);
1521 #else
1522       // when mem checkpoint is disabled, invike cb immediately
1523       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1524       cb.send();
1525 #endif
1526     }
1527
1528     void CkRestartCheckPoint(int diePe)
1529     {
1530       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1531       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1532       // broadcast
1533       checkptMgr.restart(diePe);
1534     }
1535
1536     static int _diePE = -1;
1537
1538     // callback function used locally by ccs handler
1539     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1540     {
1541       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1542       CkRestartCheckPoint(_diePE);
1543     }
1544
1545
1546     // called on crashed PE
1547     static void restartBeginHandler(char *msg)
1548     {
1549       CmiFree(msg);
1550 #if CMK_MEM_CHECKPOINT
1551 #if CMK_USE_BARRIER
1552       if(CkMyPe()!=_diePE){
1553         printf("restar begin on %d\n",CkMyPe());
1554         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1555         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1556         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1557       }else{
1558         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1559         CkRestartCheckPointCallback(NULL, NULL);
1560       }
1561 #else
1562       static int count = 0;
1563       CmiAssert(CkMyPe() == _diePE);
1564       count ++;
1565       if (count == CkNumPes()) {
1566         printf("restart begin on %d\n",CkMyPe());
1567         CkRestartCheckPointCallback(NULL, NULL);
1568         count = 0;
1569       }
1570 #endif
1571 #endif
1572     }
1573
1574     extern void _discard_charm_message();
1575     extern void _resume_charm_message();
1576
1577     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1578       return data;
1579     }
1580
1581     static void restartBcastHandler(char *msg)
1582     {
1583 #if CMK_MEM_CHECKPOINT
1584       // advance phase counter
1585       CkMemCheckPT::inRestarting = 1;
1586       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1587       // gzheng
1588       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1589
1590       if (CkMyPe()==_diePE)
1591         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1592
1593       // reset QD counters
1594       /*  gzheng
1595           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1596        */
1597
1598       /*  gzheng
1599           if (CkMyPe()==_diePE)
1600           CkRestartCheckPointCallback(NULL, NULL);
1601        */
1602       CmiFree(msg);
1603
1604       _resume_charm_message();
1605
1606       // reduction
1607       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1608       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1609 #if CMK_USE_BARRIER
1610       //CmiPrintf("before reduce\n");   
1611       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1612       //CmiPrintf("after reduce\n");    
1613 #else
1614       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1615 #endif 
1616       checkpointed = 0;
1617 #endif
1618     }
1619
1620     extern void _initDone();
1621
1622     bool compare(char * buf1, char *buf2){
1623         PUP::checker pchecker(buf1,buf2);
1624                 pchecker.skip();
1625
1626                 int numElements;
1627                 pchecker|numElements;
1628                 for(int i=0;i<numElements;i++){
1629       CkGroupID gID;
1630       CkArrayIndex idx;
1631
1632       pchecker|gID;
1633       pchecker|idx;
1634
1635       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1636       mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1637       }
1638       return pchecker.getResult();
1639       //return true;
1640     }
1641
1642     static void recvRemoteChkpHandler(char *msg){
1643       envelope *env = (envelope *)msg;
1644       CkUnpackMessage(&env);
1645       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1646       if(CpvAccess(recvdLocal)==1){
1647         int pointer = CpvAccess(curPointer);
1648         int size = CpvAccess(chkpBuf)[pointer]->len;
1649         CkPrintf("[%d][%d] checkpoint size %d pointer %d \n",CmiMyPartition(),CkMyPe(),size,pointer);
1650         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1651           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1652         }else
1653         {
1654                 CkPrintf("[%d][%d] failed the test my size %d buddy size %d pointer %d\n",CmiMyPartition(),CkMyPe(),size,chkpMsg->len,pointer);
1655           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1656         }
1657         delete chkpMsg;
1658         if(CkMyPe()==0)
1659           CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1660       }else{
1661         CpvAccess(recvdRemote) = 1;
1662         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1663         CpvAccess(buddyBuf) = chkpMsg;
1664       }  
1665     }
1666
1667     static void replicaRecoverHandler(char *msg){
1668       CpvAccess(_remoteCrashedNode) = -1;
1669       CkMemCheckPT::replicaAlive = 1;
1670       //fflush(stdout);
1671       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1672       bool ret = true;
1673       CpvAccess(remoteChkpDone) = 1;
1674       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
1675       CmiFree(msg);
1676   
1677     }
1678     static void replicaChkpDoneHandler(char *msg){
1679       CpvAccess(remoteChkpDone) = 1;
1680       if(CpvAccess(localChkpDone) == 1)
1681         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1682       CmiFree(msg);
1683     }
1684
1685     static void replicaDieHandler(char * msg){
1686 #if CMK_CONVERSE_MPI    
1687       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1688       CpvAccess(_remoteCrashedNode) = diePe;
1689       CkMemCheckPT::replicaAlive = 0;
1690       if(CkMyPe()==diePe){
1691         CmiPrintf("pe %d in replicad word die\n",diePe);
1692         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1693         fflush(stdout);
1694       }
1695       find_spare_mpirank(diePe,CmiMyPartition()^1);
1696 #endif
1697       //broadcast to my partition to get local max iter
1698       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1699       CmiFree(msg);
1700     }
1701
1702
1703     static void replicaDieBcastHandler(char *msg){
1704       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1705       CpvAccess(_remoteCrashedNode) = diePe;
1706       CkMemCheckPT::replicaAlive = 0;
1707       CmiFree(msg);
1708     }
1709
1710     static void recoverRemoteProcDataHandler(char *msg){
1711       if(CmiMyPartition()==1&&CkMyPe()==0){
1712         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1713         fflush(stdout);
1714       }
1715       envelope *env = (envelope *)msg;
1716       CkUnpackMessage(&env);
1717       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1718
1719       //store the checkpoint
1720       int pointer = procMsg->pointer;
1721
1722
1723       if(CkMyPe()==CpvAccess(_crashedNode)){
1724         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1725         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1726         PUP::fromMem p(procMsg->packData);
1727         _handleProcData(p,CmiTrue);
1728         _initDone();
1729       }
1730       else{
1731         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1732         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1733         //_handleProcData(p,CmiFalse);
1734       }
1735       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1736       CKLOCMGR_LOOP(mgr->startInserting(););
1737
1738       CpvAccess(recvdProcChkp) =1;
1739       if(CpvAccess(recvdArrayChkp)==1){
1740         _resume_charm_message();
1741         _diePE = CpvAccess(_crashedNode);
1742         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1743         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1744         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1745       }
1746       if(CmiMyPartition()==1&&CkMyPe()==0){
1747         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1748         fflush(stdout);
1749       }
1750     }
1751
1752     static void recoverRemoteArrayDataHandler(char *msg){
1753       envelope *env = (envelope *)msg;
1754       CkUnpackMessage(&env);
1755       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1756       if(CmiMyPartition()==1&&CkMyPe()==0){
1757         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1758         fflush(stdout);
1759       }
1760
1761       //store the checkpoint
1762       int pointer = chkpMsg->pointer;
1763       CpvAccess(curPointer) = pointer;
1764       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1765       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1766       CpvAccess(recvdArrayChkp) =1;
1767       CkMemCheckPT::inRestarting = 1;
1768       if(CpvAccess(recvdProcChkp) == 1){
1769         _resume_charm_message();
1770         _diePE = CpvAccess(_crashedNode);
1771         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1772         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1773         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1774         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1775       }
1776       if(CmiMyPartition()==1&&CkMyPe()==0){
1777         CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
1778         fflush(stdout);
1779       }
1780     }
1781
1782     static void recvPhaseHandler(char * msg)
1783     {
1784       CpvAccess(_curRestartPhase)--;
1785       CkMemCheckPT::inRestarting = 1;
1786       CmiFree(msg);
1787     }
1788     // called on crashed processor
1789     static void recoverProcDataHandler(char *msg)
1790     {
1791 #if CMK_MEM_CHECKPOINT
1792       int i;
1793       envelope *env = (envelope *)msg;
1794       CkUnpackMessage(&env);
1795       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1796       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1797       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1798       PUP::fromMem p(procMsg->packData);
1799       _handleProcData(p,CmiTrue);
1800
1801       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1802       // gzheng
1803       CKLOCMGR_LOOP(mgr->startInserting(););
1804
1805       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1806       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1807       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1808       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1809
1810       _initDone();
1811       //   CpvAccess(_qd)->flushStates();
1812       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1813 #endif
1814     }
1815     //for replica, got the phase number from my neighbor processor in the current partition
1816     static void askPhaseHandler(char *msg)
1817     {
1818 #if CMK_MEM_CHECKPOINT
1819       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1820       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1821       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1822       CmiSetHandler(msg, recvPhaseHandlerIdx);
1823       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1824 #endif
1825     }
1826     // called on its backup processor
1827     // get backup message buffer and sent to crashed processor
1828     static void askProcDataHandler(char *msg)
1829     {
1830 #if CMK_MEM_CHECKPOINT
1831       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1832       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1833       if (CpvAccess(procChkptBuf) == NULL)  {
1834         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1835         CkAbort("no checkpoint found");
1836       }
1837       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1838       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1839
1840       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1841
1842       CkPackMessage(&env);
1843       CmiSetHandler(env, recoverProcDataHandlerIdx);
1844       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1845       CpvAccess(procChkptBuf) = NULL;
1846       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1847 #endif
1848     }
1849
1850     // called on PE 0
1851     void qd_callback(void *m)
1852     {
1853       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1854         fflush(stdout);
1855       CkFreeMsg(m);
1856       if(CmiNumPartition()==1){
1857 #ifdef CMK_SMP
1858         for(int i=0;i<CmiMyNodeSize();i++){
1859           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1860           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1861           CmiSetHandler(msg, askProcDataHandlerIdx);
1862           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1863           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1864         }
1865         return;
1866 #endif
1867         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1868         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1869         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1870         CmiSetHandler(msg, askProcDataHandlerIdx);
1871         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1872         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1873       }
1874       else{
1875         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1876         CmiSetHandler(msg, recvPhaseHandlerIdx);
1877         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1878       }
1879     }
1880
1881     // on crashed node
1882     void CkMemRestart(const char *dummy, CkArgMsg *args)
1883     {
1884 #if CMK_MEM_CHECKPOINT
1885       _diePE = CmiMyNode();
1886       CpvAccess( _crashedNode )= CmiMyNode();
1887       CkMemCheckPT::inRestarting = 1;
1888       _discard_charm_message();
1889
1890       /*if(CmiMyRank()==0){
1891         CkCallback cb(qd_callback);
1892         CkStartQD(cb);
1893         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1894         }*/
1895       if(CmiNumPartition()==1){
1896         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1897         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1898         restartT = CmiWallTimer();
1899         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1900         fflush(stdout);
1901         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1902         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1903         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1904         CmiSetHandler(msg, askProcDataHandlerIdx);
1905         int pe = ChkptOnPe(CpvAccess(_crashedNode));
1906         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1907       }
1908       else{
1909         CkCallback cb(qd_callback);
1910         CkStartQD(cb);
1911       }
1912 #else
1913       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1914 #endif
1915     }
1916
1917     // can be called in other files
1918     // return true if it is in restarting
1919     extern "C"
1920       int CkInRestarting()
1921       {
1922 #if CMK_MEM_CHECKPOINT
1923         if (CpvAccess( _crashedNode)!=-1) return 1;
1924         // gzheng
1925         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1926         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1927         return CkMemCheckPT::inRestarting;
1928 #else
1929         return 0;
1930 #endif
1931       }
1932
1933     extern "C"
1934       int CkReplicaAlive()
1935       {
1936         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1937         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1938         return CkMemCheckPT::replicaAlive;
1939
1940         /*if(CkMemCheckPT::replicaDead==1)
1941           return 0;
1942           else          
1943           return 1;*/
1944       }
1945
1946     extern "C"
1947       int CkInCheckpointing()
1948       {
1949         return CkMemCheckPT::inCheckpointing;
1950       }
1951
1952     extern "C"
1953       void CkSetInLdb(){
1954 #if CMK_MEM_CHECKPOINT
1955         CkMemCheckPT::inLoadbalancing = 1;
1956 #endif
1957       }
1958
1959     extern "C"
1960       int CkInLdb(){
1961 #if CMK_MEM_CHECKPOINT
1962         return CkMemCheckPT::inLoadbalancing;
1963 #endif
1964         return 0;
1965       }
1966
1967     extern "C"
1968       void CkResetInLdb(){
1969 #if CMK_MEM_CHECKPOINT
1970         CkMemCheckPT::inLoadbalancing = 0;
1971 #endif
1972       }
1973
1974     /*****************************************************************************
1975       module initialization
1976      *****************************************************************************/
1977
1978     static int arg_where = CkCheckPoint_inMEM;
1979
1980 #if CMK_MEM_CHECKPOINT
1981     void init_memcheckpt(char **argv)
1982     {
1983       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1984         arg_where = CkCheckPoint_inDISK;
1985       }
1986       // initiliazing _crashedNode variable
1987       CpvInitialize(int, _crashedNode);
1988       CpvInitialize(int, _remoteCrashedNode);
1989       CpvAccess(_crashedNode) = -1;
1990       CpvAccess(_remoteCrashedNode) = -1;
1991       init_FI(argv);
1992     }
1993 #endif
1994
1995     class CkMemCheckPTInit: public Chare {
1996       public:
1997         CkMemCheckPTInit(CkArgMsg *m) {
1998 #if CMK_MEM_CHECKPOINT
1999           if (arg_where == CkCheckPoint_inDISK) {
2000             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2001           }
2002           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2003           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2004 #endif
2005         }
2006     };
2007
2008     static void notifyHandler(char *msg)
2009     {
2010 #if CMK_MEM_CHECKPOINT
2011       CmiFree(msg);
2012       /* immediately increase restart phase to filter old messages */
2013       CpvAccess(_curRestartPhase) ++;
2014       CpvAccess(_qd)->flushStates();
2015       _discard_charm_message();
2016
2017 #endif
2018     }
2019
2020     extern "C"
2021       void notify_crash(int node)
2022       {
2023 #ifdef CMK_MEM_CHECKPOINT
2024         CpvAccess( _crashedNode) = node;
2025 #ifdef CMK_SMP
2026         for(int i=0;i<CkMyNodeSize();i++){
2027           CpvAccessOther(_crashedNode,i)=node;
2028         }
2029 #endif
2030         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2031         CkMemCheckPT::inRestarting = 1;
2032
2033         // this may be in interrupt handler, send a message to reset QD
2034         int pe = CmiNodeFirst(CkMyNode());
2035         for(int i=0;i<CkMyNodeSize();i++){
2036           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2037           CmiSetHandler(msg, notifyHandlerIdx);
2038           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2039         }
2040 #endif
2041       }
2042
2043     extern "C" void (*notify_crash_fn)(int node);
2044
2045
2046 #if CMK_CONVERSE_MPI
2047     void buddyDieHandler(char *msg)
2048     {
2049 #if CMK_MEM_CHECKPOINT
2050       // notify
2051       CkMemCheckPT::inRestarting = 1;
2052       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2053       notify_crash(diepe);
2054       // send message to crash pe to let it restart
2055       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2056       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2057       int buddy = obj->BuddyPE(CmiMyPe());
2058       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2059       if (buddy == diepe)  {
2060         mpi_restart_crashed(diepe, newrank);
2061       }
2062 #endif
2063     }
2064
2065     void pingHandler(void *msg)
2066     {
2067       lastPingTime = CmiWallTimer();
2068       CmiFree(msg);
2069     }
2070
2071     void pingCheckHandler()
2072     {
2073 #if CMK_MEM_CHECKPOINT
2074       double now = CmiWallTimer();
2075       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2076         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2077         int i, pe, buddy;
2078         // tell everyone the buddy dies
2079         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2080         for (i = 1; i < CmiNumPes(); i++) {
2081           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2082           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2083         }
2084         buddy = pe;
2085         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2086         fflush(stdout);
2087         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2088         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2089         CmiSetHandler(msg, buddyDieHandlerIdx);
2090         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2091         //send to everyone in the other world
2092         if(CmiNumPartition()!=1){
2093           for(int i=0;i<CmiNumPes();i++){
2094             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2095             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2096             //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
2097             //fflush(stdout);
2098             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2099             CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2100           }
2101         }
2102       }
2103         else 
2104           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2105 #endif
2106       }
2107
2108       void pingBuddy()
2109       {
2110 #if CMK_MEM_CHECKPOINT
2111         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2112         if (obj) {
2113           int buddy = obj->BuddyPE(CkMyPe());
2114           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2115           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2116           CmiSetHandler(msg, pingHandlerIdx);
2117           CmiGetRestartPhase(msg) = 9999;
2118           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2119         }
2120         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2121 #endif
2122       }
2123 #endif
2124
2125       // initproc
2126       void CkRegisterRestartHandler( )
2127       {
2128 #if CMK_MEM_CHECKPOINT
2129         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2130         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2131         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2132         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2133         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2134
2135         //for replica
2136         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2137         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2138         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2139         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2140         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2141         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2142         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2143         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2144         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2145
2146 #if CMK_CONVERSE_MPI
2147         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2148         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2149         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2150 #endif
2151
2152         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2153         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2154         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2155         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2156         CpvInitialize(int,curPointer);
2157         CpvInitialize(int,recvdLocal);
2158         CpvInitialize(int,localChkpDone);
2159         CpvInitialize(int,remoteChkpDone);
2160         CpvInitialize(int,recvdRemote);
2161         CpvInitialize(int,recvdProcChkp);
2162         CpvInitialize(int,recvdArrayChkp);
2163
2164         CpvAccess(procChkptBuf) = NULL;
2165         CpvAccess(buddyBuf) = NULL;
2166         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2167         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2168         CpvAccess(chkpBuf)[0] = NULL;
2169         CpvAccess(chkpBuf)[1] = NULL;
2170         CpvAccess(localProcChkpBuf)[0] = NULL;
2171         CpvAccess(localProcChkpBuf)[1] = NULL;
2172
2173         CpvAccess(curPointer) = 0;
2174         CpvAccess(recvdLocal) = 0;
2175         CpvAccess(localChkpDone) = 0;
2176         CpvAccess(remoteChkpDone) = 0;
2177         CpvAccess(recvdRemote) = 0;
2178         CpvAccess(recvdProcChkp) = 0;
2179         CpvAccess(recvdArrayChkp) = 0;
2180
2181         notify_crash_fn = notify_crash;
2182
2183 #if ! CMK_CONVERSE_MPI
2184         // print pid to kill
2185         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2186         //  sleep(4);
2187 #endif
2188 #endif
2189       }
2190
2191
2192       extern "C"
2193         int CkHasCheckpoints()
2194         {
2195           return checkpointed;
2196         }
2197
2198       /// @todo: the following definitions should be moved to a separate file containing
2199       // structures and functions about fault tolerance strategies
2200
2201       /**
2202        *  * @brief: function for killing a process                                             
2203        *   */
2204 #ifdef CMK_MEM_CHECKPOINT
2205 #if CMK_HAS_GETPID
2206       void killLocal(void *_dummy,double curWallTime){
2207         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2208         if(CmiWallTimer()<killTime-1){
2209           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2210         }else{ 
2211 #if CMK_CONVERSE_MPI
2212           CkDieNow();
2213 #else 
2214           kill(getpid(),SIGKILL);                                               
2215 #endif
2216         }              
2217       } 
2218 #else
2219       void killLocal(void *_dummy,double curWallTime){
2220         CmiAbort("kill() not supported!");
2221       }
2222 #endif
2223 #endif
2224
2225 #ifdef CMK_MEM_CHECKPOINT
2226       /**
2227        * @brief: reads the file with the kill information
2228        */
2229       void readKillFile(){
2230         FILE *fp=fopen(killFile,"r");
2231         if(!fp){
2232           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2233           return;
2234         }
2235         int proc;
2236         double sec;
2237         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2238           if(proc == CkMyNode() && CkMyRank() == 0){
2239             killTime = CmiWallTimer()+sec;
2240             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2241             CcdCallFnAfter(killLocal,NULL,sec*1000);
2242           }
2243         }
2244         fclose(fp);
2245       }
2246
2247 #if ! CMK_CONVERSE_MPI
2248       void CkDieNow()
2249       {
2250 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2251         // ignored for non-mpi version
2252         CmiPrintf("[%d] die now.\n", CmiMyPe());
2253         killTime = CmiWallTimer()+0.001;
2254         CcdCallFnAfter(killLocal,NULL,1);
2255 #endif
2256       }
2257 #endif
2258
2259 #endif
2260
2261 #include "CkMemCheckpoint.def.h"
2262
2263