add pup support
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define CMK_CHKP_ALL            1
69 #define CMK_USE_BARRIER         0
70
71 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
72 #define STREAMING_INFORMHOME                    1
73 CpvDeclare(int, _crashedNode);
74 CpvDeclare(int, _remoteCrashedNode);
75
76 // static, so that it is accessible from Converse part
77 int CkMemCheckPT::inRestarting = 0;
78 int CkMemCheckPT::inCheckpointing = 0;
79 int CkMemCheckPT::replicaAlive = 1;
80 int CkMemCheckPT::inLoadbalancing = 0;
81 double CkMemCheckPT::startTime;
82 char *CkMemCheckPT::stage;
83 CkCallback CkMemCheckPT::cpCallback;
84
85 int _memChkptOn = 1;                    // checkpoint is on or off
86
87 CkGroupID ckCheckPTGroupID;             // readonly
88
89 static int checkpointed = 0;
90
91 /// @todo the following declarations should be moved into a separate file for all 
92 // fault tolerant strategies
93
94 #ifdef CMK_MEM_CHECKPOINT
95 // name of the kill file that contains processes to be killed 
96 char *killFile;                                               
97 // flag for the kill file         
98 int killFlag=0;
99 // variable for storing the killing time
100 double killTime=0.0;
101 #endif
102
103 #ifdef CKLOCMGR_LOOP
104 #undef CKLOCMGR_LOOP
105 #endif
106 // loop over all CkLocMgr and do "code"
107 #define  CKLOCMGR_LOOP(code)    {       \
108   int numGroups = CkpvAccess(_groupIDTable)->size();    \
109   for(int i=0;i<numGroups;i++) {        \
110     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
111     if(obj->isLocMgr())  {      \
112       CkLocMgr *mgr = (CkLocMgr*)obj;   \
113       code      \
114     }   \
115   }     \
116  }
117
118 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
119 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
120 CpvDeclare(CkCheckPTMessage**, chkpBuf);
121 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
122 //store the checkpoint of the buddy to compare
123 //do not need the whole msg, can be the checksum
124 CpvDeclare(CkCheckPTMessage*, buddyBuf);
125 //pointer of the checkpoint going to be written
126 CpvDeclare(int, curPointer);
127 CpvDeclare(int, recvdRemote);
128 CpvDeclare(int, recvdLocal);
129 CpvDeclare(int, recvdArrayChkp);
130 CpvDeclare(int, recvdProcChkp);
131
132 bool compare(char * buf1, char * buf2);
133 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
134 // Converse function handles
135 static int askPhaseHandlerIdx;
136 static int recvPhaseHandlerIdx;
137 static int askProcDataHandlerIdx;
138 static int restartBcastHandlerIdx;
139 static int recoverProcDataHandlerIdx;
140 static int restartBeginHandlerIdx;
141 static int recvRemoteChkpHandlerIdx;
142 static int replicaDieHandlerIdx;
143 static int replicaDieBcastHandlerIdx;
144 static int replicaRecoverHandlerIdx;
145 static int recoverRemoteProcDataHandlerIdx;
146 static int recoverRemoteArrayDataHandlerIdx;
147 static int notifyHandlerIdx;
148 // compute the backup processor
149 // FIXME: avoid crashed processors
150 #if CMK_CONVERSE_MPI
151 static int pingHandlerIdx;
152 static int pingCheckHandlerIdx;
153 static int buddyDieHandlerIdx;
154 static double lastPingTime = -1;
155
156 extern "C" void mpi_restart_crashed(int pe, int rank);
157 extern "C" int  find_spare_mpirank(int pe,int partition);
158
159 void pingBuddy();
160 void pingCheckHandler();
161
162 #endif
163 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
164
165 inline int CkMemCheckPT::BuddyPE(int pe)
166 {
167   int budpe;
168 #if NODE_CHECKPOINT
169     // buddy is the processor with same rank on the next physical node
170   int r1 = CmiPhysicalRank(pe);
171   int budnode = CmiPhysicalNodeID(pe);
172   do {
173     budnode = (budnode+1)%CmiNumPhysicalNodes();
174     int *pelist;
175     int num;
176     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
177     budpe = pelist[r1 % num];
178   } while (isFailed(budpe));
179   if (budpe == pe) {
180     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
181     CmiAbort("Failed to find a buddy processor");
182   }
183 #else
184   budpe = pe;
185   while (budpe == pe || isFailed(budpe)) 
186           budpe = (budpe+1)%CkNumPes();
187 #endif
188   return budpe;
189 }
190
191 // called in array element constructor
192 // choose and register with 2 buddies for checkpoiting 
193 #if CMK_MEM_CHECKPOINT
194 void ArrayElement::init_checkpt() {
195         if (_memChkptOn == 0) return;
196         if (CkInRestarting()) {
197           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
198         }
199         // only master init checkpoint
200         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
201
202         budPEs[0] = CkMyPe();
203         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
204         CmiAssert(budPEs[0] != budPEs[1]);
205         // inform checkPTMgr
206         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
207         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
208         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
209         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
210 }
211 #endif
212
213 // entry function invoked by checkpoint mgr asking for checkpoint data
214 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
215 #if CMK_MEM_CHECKPOINT
216 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
217 //char index[128];   thisIndexMax.sprint(index);
218 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
219   CkLocMgr *locMgr = thisArray->getLocMgr();
220   CmiAssert(myRec!=NULL);
221   int size;
222   {
223         PUP::sizer p;
224         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
225         size = p.size();
226   }
227   int packSize = size/sizeof(double) +1;
228   CkArrayCheckPTMessage *msg =
229                  new (packSize, 0) CkArrayCheckPTMessage;
230   msg->len = size;
231   msg->index =thisIndexMax;
232   msg->aid = thisArrayID;
233   msg->locMgr = locMgr->getGroupID();
234   msg->cp_flag = 1;
235   {
236         PUP::toMem p(msg->packData);
237         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
238   }
239
240   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
241   checkptMgr.recvData(msg, 2, budPEs);
242   delete m;
243 #endif
244 }
245
246 // checkpoint holder class - for memory checkpointing
247 class CkMemCheckPTInfo: public CkCheckPTInfo
248 {
249   CkArrayCheckPTMessage *ckBuffer;
250 public:
251   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
252                     CkCheckPTInfo(a, loc, idx, pno)
253   {
254     ckBuffer = NULL;
255   }
256   ~CkMemCheckPTInfo() 
257   {
258     if (ckBuffer) delete ckBuffer; 
259   }
260   inline void updateBuffer(CkArrayCheckPTMessage *data) 
261   {
262     CmiAssert(data!=NULL);
263     if (ckBuffer) delete ckBuffer;
264     ckBuffer = data;
265   }    
266   inline CkArrayCheckPTMessage * getCopy()
267   {
268     if (ckBuffer == NULL) {
269       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
270       CmiAbort("Abort!");
271     }
272     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
273   }     
274   inline void updateBuddy(int b1, int b2) {
275      CmiAssert(ckBuffer);
276      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
277      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
278      CmiAssert(pNo != CkMyPe());
279   }
280   inline int getSize() { 
281      CmiAssert(ckBuffer);
282      return ckBuffer->len; 
283   }
284 };
285
286 // checkpoint holder class - for in-disk checkpointing
287 class CkDiskCheckPTInfo: public CkCheckPTInfo 
288 {
289   char *fname;
290   int bud1, bud2;
291   int len;                      // checkpoint size
292 public:
293   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
294   {
295 #if CMK_USE_MKSTEMP
296     fname = new char[64];
297     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
298     mkstemp(fname);
299 #else
300     fname=tmpnam(NULL);
301 #endif
302     bud1 = bud2 = -1;
303     len = 0;
304   }
305   ~CkDiskCheckPTInfo() 
306   {
307     remove(fname);
308   }
309   inline void updateBuffer(CkArrayCheckPTMessage *data) 
310   {
311     double t = CmiWallTimer();
312     // unpack it
313     envelope *env = UsrToEnv(data);
314     CkUnpackMessage(&env);
315     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
316     FILE *f = fopen(fname,"wb");
317     PUP::toDisk p(f);
318     CkPupMessage(p, (void **)&data);
319     // delay sync to the end because otherwise the messages are blocked
320 //    fsync(fileno(f));
321     fclose(f);
322     bud1 = data->bud1;
323     bud2 = data->bud2;
324     len = data->len;
325     delete data;
326     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
327   }
328   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
329   {
330     CkArrayCheckPTMessage *data;
331     FILE *f = fopen(fname,"rb");
332     PUP::fromDisk p(f);
333     CkPupMessage(p, (void **)&data);
334     fclose(f);
335     data->bud1 = bud1;                          // update the buddies
336     data->bud2 = bud2;
337     return data;
338   }
339   inline void updateBuddy(int b1, int b2) {
340      bud1 = b1; bud2 = b2;
341      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
342      CmiAssert(pNo != CkMyPe());
343   }
344   inline int getSize() { 
345      return len; 
346   }
347 };
348
349 CkMemCheckPT::CkMemCheckPT(int w)
350 {
351   int numnodes = 0;
352 #if NODE_CHECKPOINT
353   numnodes = CmiNumPhysicalNodes();
354 #else
355   numnodes = CkNumPes();
356 #endif
357 #if CK_NO_PROC_POOL
358   if (numnodes <= 2)
359 #else
360   if (numnodes  == 1)
361 #endif
362   {
363     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
364     _memChkptOn = 0;
365   }
366   inRestarting = 0;
367   inCheckpointing = 0;
368   recvCount = peCount = 0;
369   ackCount = 0;
370   expectCount = -1;
371   where = w;
372   replicaAlive = 1;
373 #if CMK_CONVERSE_MPI
374   void pingBuddy();
375   void pingCheckHandler();
376   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
377   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
378 #endif
379   chkpTable[0] = NULL;
380   chkpTable[1] = NULL;
381 }
382
383 CkMemCheckPT::~CkMemCheckPT()
384 {
385   int len = ckTable.length();
386   for (int i=0; i<len; i++) {
387     delete ckTable[i];
388   }
389 }
390
391 void CkMemCheckPT::pup(PUP::er& p) 
392
393   CBase_CkMemCheckPT::pup(p); 
394   p|cpStarter;
395   p|thisFailedPe;
396   p|failedPes;
397   p|ckCheckPTGroupID;           // recover global variable
398   p|cpCallback;                 // store callback
399   p|where;                      // where to checkpoint
400   p|peCount;
401   if (p.isUnpacking()) {
402     recvCount = 0;
403 #if CMK_CONVERSE_MPI
404     void pingBuddy();
405     void pingCheckHandler();
406     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
407     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
408 #endif
409   }
410 }
411
412 // called by checkpoint mgr to restore an array element
413 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
414 {
415 #if CMK_MEM_CHECKPOINT
416   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
417   // m->index.print();
418   PUP::fromMem p(m->packData);
419   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
420   CmiAssert(mgr);
421 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
422   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
423 #else
424   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
425 #endif
426
427   // find a list of array elements bound together
428   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
429   CmiAssert(elt);
430   CkLocRec_local *rec = elt->myRec;
431   CkVec<CkMigratable *> list;
432   mgr->migratableList(rec, list);
433   CmiAssert(list.length() > 0);
434   for (int l=0; l<list.length(); l++) {
435     elt = (ArrayElement *)list[l];
436     elt->budPEs[0] = m->bud1;
437     elt->budPEs[1] = m->bud2;
438     //    reset, may not needed now
439     // for now.
440     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
441       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
442       if (c) c->redNo = 0;
443     }
444   }
445 #endif
446 }
447
448 // return 1 if pe is a crashed processor
449 int CkMemCheckPT::isFailed(int pe)
450 {
451   for (int i=0; i<failedPes.length(); i++)
452     if (failedPes[i] == pe) return 1;
453   return 0;
454 }
455
456 // add pe into history list of all failed processors
457 void CkMemCheckPT::failed(int pe)
458 {
459   if (isFailed(pe)) return;
460   failedPes.push_back(pe);
461 }
462
463 int CkMemCheckPT::totalFailed()
464 {
465   return failedPes.length();
466 }
467
468 // create an checkpoint entry for array element of aid with index.
469 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
470 {
471   // error check, no duplicate
472   int idx, len = ckTable.size();
473   for (idx=0; idx<len; idx++) {
474     CkCheckPTInfo *entry = ckTable[idx];
475     if (index == entry->index) {
476       if (loc == entry->locMgr) {
477           // bindTo array elements
478           return;
479       }
480         // for array inheritance, the following check may fail
481         // because ArrayElement constructor of all superclasses are called
482       if (aid == entry->aid) {
483         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
484         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
485       }
486     }
487   }
488   CkCheckPTInfo *newEntry;
489   if (where == CkCheckPoint_inMEM)
490     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
491   else
492     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
493   ckTable.push_back(newEntry);
494   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
495 }
496
497 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
498 {
499 #if !CMK_CHKP_ALL       
500   int buddy = msg->bud1;
501   if (buddy == CkMyPe()) buddy = msg->bud2;
502   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
503   recvData(msg);
504     // ack
505   thisProxy[buddy].gotData();
506 #else
507   chkpTable[0] = NULL;
508   chkpTable[1] = NULL;
509   recvArrayCheckpoint(msg);
510   thisProxy[msg->bud2].gotData();
511 #endif
512 }
513
514 // loop through my checkpoint table and ask checkpointed array elements
515 // to send me checkpoint data.
516 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
517 {
518   checkpointed = 1;
519   cpCallback = cb;
520   cpStarter = starter;
521   inCheckpointing = 1;
522   if (CkMyPe() == cpStarter) {
523     startTime = CmiWallTimer();
524     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
525   }
526 #if CMK_CONVERSE_MPI
527   if(CmiNumPartition()==1)
528 #endif    
529   {
530
531 #if !CMK_CHKP_ALL
532   int len = ckTable.length();
533   for (int i=0; i<len; i++) {
534     CkCheckPTInfo *entry = ckTable[i];
535       // always let the bigger number processor send request
536     //if (CkMyPe() < entry->pNo) continue;
537       // always let the smaller number processor send request, may on same proc
538     if (!isMaster(entry->pNo)) continue;
539       // call inmem_checkpoint to the array element, ask it to send
540       // back checkpoint data via recvData().
541     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
542     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
543   }
544     // if my table is empty, then I am done
545   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
546 #else
547   startArrayCheckpoint();
548 #endif
549         sendProcData();
550   }
551 #if CMK_CONVERSE_MPI
552   else
553   {
554         startCheckpoint();
555   }
556 #endif
557   // pack and send proc level data
558 }
559
560 class MemElementPacker : public CkLocIterator{
561         private:
562                 CkLocMgr *locMgr;
563                 PUP::er &p;
564         public:
565                 MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
566                 void addLocation(CkLocation &loc){
567                         CkArrayIndexMax idx = loc.getIndex();
568                         CkGroupID gID = locMgr->ckGetGroupID();
569                         p|gID;
570                         p|idx;
571                         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
572                 }
573 };
574
575 void pupAllElements(PUP::er &p){
576 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
577         int numElements;
578         if(!p.isUnpacking()){
579                 numElements = CkCountArrayElements();
580         }
581         p | numElements;
582         if(!p.isUnpacking()){
583                 CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
584         }
585 #endif
586 }
587
588 void CkMemCheckPT::startArrayCheckpoint(){
589 #if CMK_CHKP_ALL
590         int size;
591         {
592                 PUP::sizer psizer;
593                 pupAllElements(psizer);
594                 size = psizer.size();
595         }
596         int packSize = size/sizeof(double)+1;
597  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
598         CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
599         msg->len = size;
600         msg->cp_flag = 1;
601         int budPEs[2];
602         msg->bud1=CkMyPe();
603         msg->bud2=ChkptOnPe(CkMyPe());
604         {
605                 PUP::toMem p(msg->packData);
606                 pupAllElements(p);
607         }
608         thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
609         if(chkpTable[0]) delete chkpTable[0];
610         chkpTable[0] = msg;
611         //send the checkpoint to my 
612         recvCount++;
613 #endif
614 }
615
616 void CkMemCheckPT::startCheckpoint(){
617 #if CMK_CONVERSE_MPI
618         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
619     CkPrintf("in start checkpointing!!!!\n");
620   int size;
621   {
622     PUP::sizer p;
623     _handleProcData(p,CmiFalse);
624     size = p.size();
625   }
626         int packSize = size/sizeof(double)+1;
627         CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
628         procMsg->len = size;
629         procMsg->cp_flag = 1;
630         {
631                 PUP::toMem p(procMsg->packData);
632                 _handleProcData(p,CmiFalse);
633         }
634         int pointer = CpvAccess(curPointer);
635         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
636                 CpvAccess(localProcChkpBuf)[pointer] = procMsg;
637         
638         {
639                 PUP::sizer psizer;
640                 pupAllElements(psizer);
641                 size = psizer.size();
642         }
643         packSize = size/sizeof(double)+1;
644         CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
645         msg->len = size;
646         msg->cp_flag = 1;
647         {
648                 PUP::toMem p(msg->packData);
649                 pupAllElements(p);
650         }
651         pointer = CpvAccess(curPointer);
652         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
653     CkPrintf("start checkpointing!!!!\n");
654         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
655                 CpvAccess(chkpBuf)[pointer] = msg;
656         if(CkReplicaAlive()==1){
657                 CpvAccess(recvdLocal) = 1;
658                 envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
659                 CkPackMessage(&env);
660                 CmiSetHandler(env,recvRemoteChkpHandlerIdx);
661                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
662         }
663         if(CpvAccess(recvdRemote)==1){
664                 //compare the checkpoint 
665           int size = CpvAccess(chkpBuf)[pointer]->len;
666           if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
667                 thisProxy[CkMyPe()].doneComparison(true);
668           }else{
669                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
670                 thisProxy[CkMyPe()].doneComparison(false);
671           }
672   }
673         else{
674                 if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
675                         {       
676                                 int pointer = CpvAccess(curPointer);
677                                 //send the proc data
678                                 CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
679                                 procMsg->pointer = pointer;
680                                 envelope * env = (envelope *)(UsrToEnv(procMsg));
681                                 CkPackMessage(&env);
682                                 CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
683                                 CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
684                                 if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
685                                         CkPrintf("[%d] sendProcdata\n",CkMyPe());
686                                 }
687                         }
688                         //send the array checkpoint data
689                         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
690                         msg->pointer = CpvAccess(curPointer);
691                         envelope * env = (envelope *)(UsrToEnv(msg));
692                         CkPackMessage(&env);
693                         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
694                         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
695                         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
696                           CkPrintf("[%d] sendArraydata\n",CkMyPe());
697                         //can continue work, no need to wait for my replica
698                 }
699         }
700 #endif
701 }
702
703 void CkMemCheckPT::doneComparison(bool ret){
704         int _ret;
705         if(!ret){
706         CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
707                 _ret = 0;
708         }else{
709                 _ret = 1;
710         }
711         inCheckpointing = 0;
712         CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
713         contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
714 }
715
716 void CkMemCheckPT::doneRComparison(int ret){
717         CpvAccess(recvdRemote) = 0;
718         CpvAccess(recvdLocal) = 0;
719 //      if(CpvAccess(curPointer) == 0){
720         if(ret==1){
721         CpvAccess(curPointer)^=1;
722                 if(CkMyPe() == 0){
723                 CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
724                         cpCallback.send();
725                 }
726         }else{
727         CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
728                 RollBack();
729         }
730 }
731
732 void CkMemCheckPT::RollBack(){
733         //restore group data
734         checkpointed = 0;
735         CkMemCheckPT::inRestarting = 1;
736         int pointer = CpvAccess(curPointer)^1;//use the previous one
737     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
738         PUP::fromMem p(chkpMsg->packData);      
739         //_handleProcData(p);
740         
741         //destroy array elements
742         //CKLOCMGR_LOOP(mgr->flushLocalRecs(););
743           int numGroups = CkpvAccess(_groupIDTable)->size();
744           for(int i=0;i<numGroups;i++) {
745                 CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
746                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
747                 obj->flushStates();
748                 obj->ckJustMigrated();
749           }
750         //restore array elements
751         
752         int numElements;
753         p|numElements;
754         
755         if(p.isUnpacking()){
756                 for(int i=0;i<numElements;i++){
757                 //for(int i=0;i<1;i++){
758                         CkGroupID gID;
759                         CkArrayIndex idx;
760                         p|gID;
761                         p|idx;
762                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
763                         CmiAssert(mgr);
764                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
765                 }
766         }
767         CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
768         contribute(cb);
769 }
770
771 void CkMemCheckPT::notifyReplicaDie(int pe){
772         CkPrintf("[%d] receive replica die\n",CkMyPe());
773         replicaAlive = 0;
774         CpvAccess(_remoteCrashedNode) = pe;
775 }
776
777 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
778 {
779 #if CMK_CHKP_ALL
780         int idx = 1;
781         if(msg->bud1 == CkMyPe()){
782                 idx = 0;
783         }
784         int isChkpting = msg->cp_flag;
785         if(isChkpting == 1){
786                 if(chkpTable[idx]) delete chkpTable[idx];
787         }
788         chkpTable[idx] = msg;
789         if(isChkpting){
790                 recvCount++;
791                 if(recvCount == 2){
792                   if (where == CkCheckPoint_inMEM) {
793                         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
794                   }
795                   else if (where == CkCheckPoint_inDISK) {
796                         // another barrier for finalize the writing using fsync
797                         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
798                         contribute(0,NULL,CkReduction::sum_int,localcb);
799                   }
800                   else
801                         CmiAbort("Unknown checkpoint scheme");
802                   recvCount = 0;
803                 }
804         }
805 #endif
806 }
807
808 // don't handle array elements
809 static inline void _handleProcData(PUP::er &p, CmiBool create)
810 {
811     // save readonlys, and callback BTW
812     CkPupROData(p);
813
814     // save mainchares 
815     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
816         
817 #ifndef CMK_CHARE_USE_PTR
818     // save non-migratable chare
819     CkPupChareData(p);
820 #endif
821
822     // save groups into Groups.dat
823     CkPupGroupData(p,create);
824
825     // save nodegroups into NodeGroups.dat
826     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
827 }
828
829 void CkMemCheckPT::sendProcData()
830 {
831   // find out size of buffer
832   int size;
833   {
834     PUP::sizer p;
835     _handleProcData(p,CmiTrue);
836     size = p.size();
837   }
838   int packSize = size;
839   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
840   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
841   {
842     PUP::toMem p(msg->packData);
843     _handleProcData(p,CmiTrue);
844   }
845   msg->pe = CkMyPe();
846   msg->len = size;
847   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
848   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
849 }
850
851 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
852 {
853   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
854   CpvAccess(procChkptBuf) = msg;
855   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
856   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
857 }
858
859 // ArrayElement call this function to give us the checkpointed data
860 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
861 {
862   int len = ckTable.length();
863   int idx;
864   for (idx=0; idx<len; idx++) {
865     CkCheckPTInfo *entry = ckTable[idx];
866     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
867   }
868   CkAssert(idx < len);
869   int isChkpting = msg->cp_flag;
870   ckTable[idx]->updateBuffer(msg);
871   if (isChkpting) {
872       // all my array elements have returned their inmem data
873       // inform starter processor that I am done.
874     recvCount ++;
875     if (recvCount == ckTable.length()) {
876       if (where == CkCheckPoint_inMEM) {
877         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
878       }
879       else if (where == CkCheckPoint_inDISK) {
880         // another barrier for finalize the writing using fsync
881         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
882         contribute(0,NULL,CkReduction::sum_int,localcb);
883       }
884       else
885         CmiAbort("Unknown checkpoint scheme");
886       recvCount = 0;
887     } 
888   }
889 }
890
891 // only used in disk checkpointing
892 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
893 {
894   delete m;
895 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
896   system("sync");
897 #endif
898   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
899 }
900
901 // only is called on cpStarter when checkpoint is done
902 void CkMemCheckPT::cpFinish()
903 {
904   CmiAssert(CkMyPe() == cpStarter);
905   peCount++;
906     // now that all processors have finished, activate callback
907   if (peCount == 2) 
908 {
909     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
910     cpCallback.send();
911     peCount = 0;
912     thisProxy.report();
913   }
914 }
915
916 // for debugging, report checkpoint info
917 void CkMemCheckPT::report()
918 {
919         inCheckpointing = 0;
920 #if !CMK_CHKP_ALL
921   int objsize = 0;
922   int len = ckTable.length();
923   for (int i=0; i<len; i++) {
924     CkCheckPTInfo *entry = ckTable[i];
925     CmiAssert(entry);
926     objsize += entry->getSize();
927   }
928   CmiAssert(CpvAccess(procChkptBuf));
929   //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
930 #else
931   if(CkMyPe()==0)
932   CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
933 #endif
934 }
935
936 /*****************************************************************************
937                         RESTART Procedure
938 *****************************************************************************/
939
940 // master processor of two buddies
941 inline int CkMemCheckPT::isMaster(int buddype)
942 {
943 #if 0
944   int mype = CkMyPe();
945 //CkPrintf("ismaster: %d %d\n", pe, mype);
946   if (CkNumPes() - totalFailed() == 2) {
947     return mype > buddype;
948   }
949   for (int i=1; i<CkNumPes(); i++) {
950     int me = (buddype+i)%CkNumPes();
951     if (isFailed(me)) continue;
952     if (me == mype) return 1;
953     else return 0;
954   }
955   return 0;
956 #else
957     // smaller one
958   int mype = CkMyPe();
959 //CkPrintf("ismaster: %d %d\n", pe, mype);
960   if (CkNumPes() - totalFailed() == 2) {
961     return mype < buddype;
962   }
963 #if NODE_CHECKPOINT
964   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
965   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
966 #else
967   for (int i=1; i<CkNumPes(); i++) {
968 #endif
969     int me = (mype+i)%CkNumPes();
970     if (isFailed(me)) continue;
971     if (me == buddype) return 1;
972     else return 0;
973   }
974   return 0;
975 #endif
976 }
977
978
979
980 #if 0
981 // helper class to pup all elements that belong to same ckLocMgr
982 class ElementDestoryer : public CkLocIterator {
983 private:
984         CkLocMgr *locMgr;
985 public:
986         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
987         void addLocation(CkLocation &loc) {
988                 CkArrayIndex idx=loc.getIndex();
989                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
990                 loc.destroy();
991         }
992 };
993 #endif
994
995 // restore the bitmap vector for LB
996 void CkMemCheckPT::resetLB(int diepe)
997 {
998 #if CMK_LBDB_ON
999   int i;
1000   char *bitmap = new char[CkNumPes()];
1001   // set processor available bitmap
1002   get_avail_vector(bitmap);
1003
1004   for (i=0; i<failedPes.length(); i++)
1005     bitmap[failedPes[i]] = 0; 
1006   bitmap[diepe] = 0;
1007
1008 #if CK_NO_PROC_POOL
1009   set_avail_vector(bitmap);
1010 #endif
1011
1012   // if I am the crashed pe, rebuild my failedPEs array
1013   if (CkMyNode() == diepe)
1014     for (i=0; i<CkNumPes(); i++) 
1015       if (bitmap[i]==0) failed(i);
1016
1017   delete [] bitmap;
1018 #endif
1019 }
1020
1021 static double restartT;
1022
1023 // in case when failedPe dies, everybody go through its checkpoint table:
1024 // destory all array elements
1025 // recover lost buddies
1026 // reconstruct all array elements from check point data
1027 // called on all processors
1028 void CkMemCheckPT::restart(int diePe)
1029 {
1030 #if CMK_MEM_CHECKPOINT
1031   double curTime = CmiWallTimer();
1032   if (CkMyPe() == diePe){
1033            restartT = CmiWallTimer();
1034     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1035   }
1036   stage = (char*)"resetLB";
1037   startTime = curTime;
1038   if (CkMyPe() == diePe)
1039     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1040
1041 #if CK_NO_PROC_POOL
1042   failed(diePe);        // add into the list of failed pes
1043 #endif
1044   thisFailedPe = diePe;
1045
1046   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1047
1048   inRestarting = 1;
1049                                                                                 
1050   // disable load balancer's barrier
1051   //if (CkMyPe() != diePe) resetLB(diePe);
1052
1053   CKLOCMGR_LOOP(mgr->startInserting(););
1054
1055
1056   if(CmiNumPartition()==1){
1057         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1058   }else{
1059         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1060   }
1061 #endif
1062 }
1063
1064 // loally remove all array elements
1065 void CkMemCheckPT::removeArrayElements()
1066 {
1067 #if CMK_MEM_CHECKPOINT
1068   int len = ckTable.length();
1069   double curTime = CmiWallTimer();
1070   if (CkMyPe() == thisFailedPe) 
1071     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1072   stage = (char*)"removeArrayElements";
1073   startTime = curTime;
1074
1075 //  if (cpCallback.isInvalid()) 
1076 //        CkPrintf("invalid pe %d\n",CkMyPe());
1077 //        CkAbort("Didn't set restart callback\n");;
1078   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1079
1080   // get rid of all buffering and remote recs
1081   // including destorying all array elements
1082 #if CK_NO_PROC_POOL  
1083         CKLOCMGR_LOOP(mgr->flushAllRecs(););
1084 #else
1085         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1086 #endif
1087   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1088 #endif
1089 }
1090
1091 // flush state in reduction manager
1092 void CkMemCheckPT::resetReductionMgr()
1093 {
1094   if (CkMyPe() == thisFailedPe) 
1095     CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1096   int numGroups = CkpvAccess(_groupIDTable)->size();
1097   for(int i=0;i<numGroups;i++) {
1098     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1099     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1100     obj->flushStates();
1101     obj->ckJustMigrated();
1102   }
1103   // reset again
1104   //CpvAccess(_qd)->flushStates();
1105   if(CmiNumPartition()==1){
1106         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1107   }
1108   else
1109         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1110 }
1111
1112 // recover the lost buddies
1113 void CkMemCheckPT::recoverBuddies()
1114 {
1115   int idx;
1116   int len = ckTable.length();
1117   // ready to flush reduction manager
1118   // cannot be CkMemCheckPT::restart because destory will modify states
1119   double curTime = CmiWallTimer();
1120   if (CkMyPe() == thisFailedPe)
1121   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1122   stage = (char *)"recoverBuddies";
1123   if (CkMyPe() == thisFailedPe)
1124   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1125   startTime = curTime;
1126
1127   // recover buddies
1128   expectCount = 0;
1129 #if !CMK_CHKP_ALL
1130   for (idx=0; idx<len; idx++) {
1131     CkCheckPTInfo *entry = ckTable[idx];
1132     if (entry->pNo == thisFailedPe) {
1133 #if CK_NO_PROC_POOL
1134       // find a new buddy
1135       int budPe = BuddyPE(CkMyPe());
1136 #else
1137       int budPe = thisFailedPe;
1138 #endif
1139       CkArrayCheckPTMessage *msg = entry->getCopy();
1140       msg->bud1 = budPe;
1141       msg->bud2 = CkMyPe();
1142       msg->cp_flag = 0;            // not checkpointing
1143       thisProxy[budPe].recoverEntry(msg);
1144       expectCount ++;
1145     }
1146   }
1147 #else
1148   //send to failed pe
1149   if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1150 #if CK_NO_PROC_POOL
1151       // find a new buddy
1152       int budPe = BuddyPE(CkMyPe());
1153 #else
1154       int budPe = thisFailedPe;
1155 #endif
1156       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1157       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1158           msg->cp_flag = 0;            // not checkpointing
1159       msg->bud1 = budPe;
1160       msg->bud2 = CkMyPe();
1161       thisProxy[budPe].recoverEntry(msg);
1162       expectCount ++;
1163   }
1164 #endif
1165
1166   if (expectCount == 0) {
1167           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1168   }
1169   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1170 }
1171
1172 void CkMemCheckPT::gotData()
1173 {
1174   ackCount ++;
1175   if (ackCount == expectCount) {
1176     ackCount = 0;
1177     expectCount = -1;
1178     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1179     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1180   }
1181 }
1182
1183 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1184 {
1185
1186   for (int i=0; i<n; i++) {
1187     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1188     mgr->updateLocation(idx[i], nowOnPe);
1189   }
1190         thisProxy[nowOnPe].gotReply();
1191 }
1192
1193 // restore array elements
1194 void CkMemCheckPT::recoverArrayElements()
1195 {
1196   double curTime = CmiWallTimer();
1197   int len = ckTable.length();
1198   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1199   stage = (char *)"recoverArrayElements";
1200   if (CkMyPe() == thisFailedPe)
1201   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1202   startTime = curTime;
1203  int flag = 0;
1204   // recover all array elements
1205   int count = 0;
1206
1207 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1208   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1209   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1210 #endif
1211
1212 #if !CMK_CHKP_ALL
1213   for (int idx=0; idx<len; idx++)
1214   {
1215     CkCheckPTInfo *entry = ckTable[idx];
1216 #if CK_NO_PROC_POOL
1217     // the bigger one will do 
1218 //    if (CkMyPe() < entry->pNo) continue;
1219     if (!isMaster(entry->pNo)) continue;
1220 #else
1221     // smaller one do it, which has the original object
1222     if (CkMyPe() == entry->pNo+1 || 
1223         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1224 #endif
1225 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1226
1227     entry->updateBuddy(CkMyPe(), entry->pNo);
1228     CkArrayCheckPTMessage *msg = entry->getCopy();
1229     // gzheng
1230     //thisProxy[CkMyPe()].inmem_restore(msg);
1231     inmem_restore(msg);
1232 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1233     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1234     int homePe = mgr->homePe(msg->index);
1235     if (homePe != CkMyPe()) {
1236       gmap[homePe].push_back(msg->locMgr);
1237       imap[homePe].push_back(msg->index);
1238     }
1239 #endif
1240     CkFreeMsg(msg);
1241     count ++;
1242   }
1243 #else
1244         double * packData;
1245         if(CmiNumPartition()==1){
1246                 CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1247                 packData = msg->packData;
1248         }
1249         else{
1250                 int pointer = CpvAccess(curPointer);
1251                 CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1252                 packData = msg->packData;
1253         }
1254 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1255         recoverAll(packData,gmap,imap);
1256 #else
1257         recoverAll(packData);
1258 #endif
1259 #endif
1260   curTime = CmiWallTimer();
1261   //if (CkMyPe() == thisFailedPe)
1262   if (CkMyPe() == 0)
1263         CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
1264 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1265   for (int i=0; i<CkNumPes(); i++) {
1266     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1267       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1268         flag++; 
1269         }
1270   }
1271   delete [] imap;
1272   delete [] gmap;
1273 #endif
1274   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1275
1276   CKLOCMGR_LOOP(mgr->doneInserting(););
1277
1278   // _crashedNode = -1;
1279   CpvAccess(_crashedNode) = -1;
1280   inRestarting = 0;
1281 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1282   if (CkMyPe() == 0)
1283     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1284 #else
1285 if(flag == 0)
1286 {
1287     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1288 }
1289 #endif
1290 }
1291
1292 void CkMemCheckPT::gotReply(){
1293     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1294 }
1295
1296 void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1297 #if CMK_CHKP_ALL
1298         PUP::fromMem p(packData);
1299         int numElements;
1300         p|numElements;
1301         if(p.isUnpacking()){
1302                 for(int i=0;i<numElements;i++){
1303                         CkGroupID gID;
1304                         CkArrayIndex idx;
1305                         p|gID;
1306                         p|idx;
1307                         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1308                         int homePe = mgr->homePe(idx);
1309 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1310                         mgr->resume(idx,p,CmiTrue,CmiTrue);
1311 #else
1312                         if(CmiNumPartition()==1)
1313                                 mgr->resume(idx,p,CmiFalse,CmiTrue);    
1314                         else{
1315                                 if(CkMyPe()==thisFailedPe){
1316                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1317                                 }
1318                         else{
1319                                         mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
1320                                 }
1321                         }
1322 #endif
1323 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1324                         homePe = mgr->homePe(idx);
1325                         if (homePe != CkMyPe()) {
1326                           gmap[homePe].push_back(gID);
1327                           imap[homePe].push_back(idx);
1328                         }
1329 #endif
1330                 }
1331         }
1332 #endif
1333 }
1334
1335
1336 // on every processor
1337 // turn load balancer back on
1338 void CkMemCheckPT::finishUp()
1339 {
1340   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1341   //CKLOCMGR_LOOP(mgr->doneInserting(););
1342   
1343   if (CkMyPe() == thisFailedPe)
1344   {
1345        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1346        cpCallback.send();
1347        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1348   }
1349         
1350 #if CMK_CONVERSE_MPI    
1351   if(CmiNumPartition()!=1){
1352         CpvAccess(recvdProcChkp) = 0;
1353         CpvAccess(recvdArrayChkp) = 0;
1354         CpvAccess(curPointer)^=1;
1355         //notify my replica, restart is done
1356    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1357    CmiSetHandler(msg,replicaRecoverHandlerIdx);
1358    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1359   }
1360    if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1361          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1362    }
1363 #endif
1364
1365 #if CK_NO_PROC_POOL
1366 #if NODE_CHECKPOINT
1367   int numnodes = CmiNumPhysicalNodes();
1368 #else
1369   int numnodes = CkNumPes();
1370 #endif
1371   if (numnodes-totalFailed() <=2) {
1372     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1373     _memChkptOn = 0;
1374   }
1375 #endif
1376 }
1377
1378 void CkMemCheckPT::recoverFromSoftFailure()
1379 {
1380         inRestarting = 0;
1381         if(CkMyPe()==0)
1382         cpCallback.send();
1383 }
1384 // called only on 0
1385 void CkMemCheckPT::quiescence(CkCallback &cb)
1386 {
1387   static int pe_count = 0;
1388   pe_count ++;
1389   CmiAssert(CkMyPe() == 0);
1390   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1391   if (pe_count == CkNumPes()) {
1392     pe_count = 0;
1393     cb.send();
1394   }
1395 }
1396
1397 // User callable function - to start a checkpoint
1398 // callback cb is used to pass control back
1399 void CkStartMemCheckpoint(CkCallback &cb)
1400 {
1401 #if CMK_MEM_CHECKPOINT
1402         CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1403   if (_memChkptOn == 0) {
1404     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1405     cb.send();
1406     return;
1407   }
1408   if (CkInRestarting()) {
1409       // trying to checkpointing during restart
1410     cb.send();
1411     return;
1412   }
1413     // store user callback and user data
1414   CkMemCheckPT::cpCallback = cb;
1415
1416     // broadcast to start check pointing
1417   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1418   checkptMgr.doItNow(CkMyPe(), cb);
1419 #else
1420   // when mem checkpoint is disabled, invike cb immediately
1421   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1422   cb.send();
1423 #endif
1424 }
1425
1426 void CkRestartCheckPoint(int diePe)
1427 {
1428 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1429   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1430   // broadcast
1431   checkptMgr.restart(diePe);
1432 }
1433
1434 static int _diePE = -1;
1435
1436 // callback function used locally by ccs handler
1437 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1438 {
1439 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1440   CkRestartCheckPoint(_diePE);
1441 }
1442
1443
1444 // called on crashed PE
1445 static void restartBeginHandler(char *msg)
1446 {
1447   CmiFree(msg);
1448 #if CMK_MEM_CHECKPOINT
1449 #if CMK_USE_BARRIER
1450         if(CkMyPe()!=_diePE){
1451                 printf("restar begin on %d\n",CkMyPe());
1452                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1453                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1454                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1455         }else{
1456         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1457         CkRestartCheckPointCallback(NULL, NULL);
1458         }
1459 #else
1460   static int count = 0;
1461   CmiAssert(CkMyPe() == _diePE);
1462   count ++;
1463   if (count == CkNumPes()) {
1464           printf("restart begin on %d\n",CkMyPe());
1465     CkRestartCheckPointCallback(NULL, NULL);
1466     count = 0;
1467   }
1468 #endif
1469 #endif
1470 }
1471
1472 extern void _discard_charm_message();
1473 extern void _resume_charm_message();
1474
1475 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1476         return data;
1477 }
1478
1479 static void restartBcastHandler(char *msg)
1480 {
1481 #if CMK_MEM_CHECKPOINT
1482   // advance phase counter
1483   CkMemCheckPT::inRestarting = 1;
1484   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1485   // gzheng
1486   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1487
1488   if (CkMyPe()==_diePE)
1489     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1490
1491   // reset QD counters
1492 /*  gzheng
1493   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1494 */
1495
1496 /*  gzheng
1497   if (CkMyPe()==_diePE)
1498       CkRestartCheckPointCallback(NULL, NULL);
1499 */
1500   CmiFree(msg);
1501
1502   _resume_charm_message();
1503
1504     // reduction
1505   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1506   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1507 #if CMK_USE_BARRIER
1508         //CmiPrintf("before reduce\n"); 
1509         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1510         //CmiPrintf("after reduce\n");  
1511 #else
1512   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1513 #endif 
1514  checkpointed = 0;
1515 #endif
1516 }
1517
1518 extern void _initDone();
1519
1520 bool compare(char * buf1, char *buf2){
1521         //buf1 my copy, buf2 from another one 
1522 //      CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
1523         PUP::checker pchecker(buf1,buf2);
1524         pchecker.skip();
1525         
1526         int numElements;
1527         pchecker|numElements;
1528         CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
1529         for(int i=0;i<numElements;i++){
1530         //for(int i=0;i<1;i++){
1531                 CkGroupID gID;
1532                 CkArrayIndex idx;
1533                 
1534                 pchecker|gID;
1535                 pchecker|idx;
1536                 
1537                 CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
1538                 CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1539                 mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1540                 CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
1541         }
1542         if(pchecker.getResult()){
1543                 CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
1544         }else{
1545                 CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
1546         }
1547         return pchecker.getResult();
1548 }
1549
1550 static void recvRemoteChkpHandler(char *msg){
1551    envelope *env = (envelope *)msg;
1552    CkUnpackMessage(&env);
1553    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1554   if(CpvAccess(recvdLocal)==1){
1555           int pointer = CpvAccess(curPointer);
1556           int size = CpvAccess(chkpBuf)[pointer]->len;
1557           CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1558           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1559                         checkptMgr[CkMyPe()].doneComparison(true);
1560           }else
1561           {
1562                   CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
1563                         checkptMgr[CkMyPe()].doneComparison(false);
1564           }
1565   }else{
1566           CpvAccess(recvdRemote) = 1;
1567           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1568           CpvAccess(buddyBuf) = chkpMsg;
1569   }  
1570 }
1571
1572 static void replicaRecoverHandler(char *msg){
1573         CpvAccess(_remoteCrashedNode) = -1;
1574         CkMemCheckPT::replicaAlive = 1;
1575     bool ret = true;
1576         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1577         checkptMgr[CkMyPe()].doneComparison(ret);
1578 }
1579
1580 static void replicaDieHandler(char * msg){
1581 #if CMK_CONVERSE_MPI    
1582         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1583         CpvAccess(_remoteCrashedNode) = diePe;
1584         CkMemCheckPT::replicaAlive = 0;
1585         find_spare_mpirank(diePe,CmiMyPartition()^1);
1586     if(CkMyPe()==diePe){
1587       CkPrintf("pe %d in replicad word die\n",diePe);
1588             CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1589     }
1590 #endif
1591         //broadcast to my partition
1592         //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1593         //checkptMgr.notifyReplicaDie(diePe);
1594     //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1595     //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1596 }
1597
1598
1599 static void replicaDieBcastHandler(char *msg){
1600         int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1601         CpvAccess(_remoteCrashedNode) = diePe;
1602         CkMemCheckPT::replicaAlive = 0;
1603 }
1604
1605 static void recoverRemoteProcDataHandler(char *msg){
1606    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1607    envelope *env = (envelope *)msg;
1608    CkUnpackMessage(&env);
1609    CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1610         
1611    //store the checkpoint
1612         int pointer = procMsg->pointer;
1613
1614
1615    if(CkMyPe()==CpvAccess(_crashedNode)){
1616            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1617            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1618            PUP::fromMem p(procMsg->packData);
1619            _handleProcData(p,CmiTrue);
1620            _initDone();
1621    }
1622    else{
1623            if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1624            CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1625            //_handleProcData(p,CmiFalse);
1626    }
1627    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1628    CKLOCMGR_LOOP(mgr->startInserting(););
1629    
1630    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1631    CpvAccess(recvdProcChkp) =1;
1632         if(CpvAccess(recvdArrayChkp)==1){
1633                 _resume_charm_message();
1634                 _diePE = CpvAccess(_crashedNode);
1635                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1636                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1637                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1638         }
1639 }
1640
1641 static void recoverRemoteArrayDataHandler(char *msg){
1642   if(CkMyPe()==CpvAccess(_crashedNode)) 
1643   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
1644    envelope *env = (envelope *)msg;
1645    CkUnpackMessage(&env);
1646    CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1647         
1648    //store the checkpoint
1649         int pointer = chkpMsg->pointer;
1650         CpvAccess(curPointer) = pointer;
1651         if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1652         CpvAccess(chkpBuf)[pointer] = chkpMsg;
1653    CpvAccess(recvdArrayChkp) =1;
1654         CkMemCheckPT::inRestarting = 1;
1655         if(CpvAccess(recvdProcChkp) == 1){
1656                 _resume_charm_message();
1657                 _diePE = CpvAccess(_crashedNode);
1658                 //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1659                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1660                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1661                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1662         }
1663 }
1664
1665 static void recvPhaseHandler(char * msg)
1666 {
1667         CpvAccess(_curRestartPhase)--;
1668         CkMemCheckPT::inRestarting = 1;
1669         //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
1670   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1671   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
1672   //     if(CmiMyPartition()==1&&CkMyPe()==2){
1673 //              CmiPrintf("start ping check handler\n");
1674 //       }
1675         // CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1676   // }
1677    //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1678    //CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1679
1680 }
1681 // called on crashed processor
1682 static void recoverProcDataHandler(char *msg)
1683 {
1684 #if CMK_MEM_CHECKPOINT
1685    int i;
1686    envelope *env = (envelope *)msg;
1687    CkUnpackMessage(&env);
1688    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1689    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1690    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1691    PUP::fromMem p(procMsg->packData);
1692    _handleProcData(p,CmiTrue);
1693
1694    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1695    // gzheng
1696    CKLOCMGR_LOOP(mgr->startInserting(););
1697
1698    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1699    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1700    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1701    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1702
1703    _initDone();
1704 //   CpvAccess(_qd)->flushStates();
1705    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1706 #endif
1707 }
1708 //for replica, got the phase number from my neighbor processor in the current partition
1709 static void askPhaseHandler(char *msg)
1710 {
1711 #if CMK_MEM_CHECKPOINT
1712         CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1713     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1714         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1715     CmiSetHandler(msg, recvPhaseHandlerIdx);
1716         CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1717 #endif
1718 }
1719 // called on its backup processor
1720 // get backup message buffer and sent to crashed processor
1721 static void askProcDataHandler(char *msg)
1722 {
1723 #if CMK_MEM_CHECKPOINT
1724     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1725     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1726     if (CpvAccess(procChkptBuf) == NULL)  {
1727       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1728       CkAbort("no checkpoint found");
1729     }
1730     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1731     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1732
1733     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1734
1735     CkPackMessage(&env);
1736     CmiSetHandler(env, recoverProcDataHandlerIdx);
1737     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1738     CpvAccess(procChkptBuf) = NULL;
1739     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1740 #endif
1741 }
1742
1743 // called on PE 0
1744 void qd_callback(void *m)
1745 {
1746    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1747    CkFreeMsg(m);
1748    if(CmiNumPartition()==1){
1749 #ifdef CMK_SMP
1750            for(int i=0;i<CmiMyNodeSize();i++){
1751            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1752            *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1753                 CmiSetHandler(msg, askProcDataHandlerIdx);
1754                 int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1755                 CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1756            }
1757            return;
1758 #endif
1759            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1760            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1761            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1762            CmiSetHandler(msg, askProcDataHandlerIdx);
1763            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1764            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1765         }
1766    else{
1767                 char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1768                 CmiSetHandler(msg, recvPhaseHandlerIdx);
1769                 CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
1770    }
1771 }
1772
1773 // on crashed node
1774 void CkMemRestart(const char *dummy, CkArgMsg *args)
1775 {
1776 #if CMK_MEM_CHECKPOINT
1777    _diePE = CmiMyNode();
1778    CpvAccess( _crashedNode )= CmiMyNode();
1779    CkMemCheckPT::inRestarting = 1;
1780    _discard_charm_message();
1781   
1782   /*if(CmiMyRank()==0){
1783     CkCallback cb(qd_callback);
1784     CkStartQD(cb);
1785     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1786   }*/
1787    if(CmiNumPartition()==1){
1788            CkMemCheckPT::startTime = restartT = CmiWallTimer();
1789                 CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1790                 restartT = CmiWallTimer();
1791            CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
1792            char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1793            *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1794            // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1795            CmiSetHandler(msg, askProcDataHandlerIdx);
1796            int pe = ChkptOnPe(CpvAccess(_crashedNode));
1797            CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1798    }
1799    else{
1800                 CkCallback cb(qd_callback);
1801                 CkStartQD(cb);
1802    }
1803 #else
1804    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1805 #endif
1806 }
1807
1808 // can be called in other files
1809 // return true if it is in restarting
1810 extern "C"
1811 int CkInRestarting()
1812 {
1813 #if CMK_MEM_CHECKPOINT
1814   if (CpvAccess( _crashedNode)!=-1) return 1;
1815   // gzheng
1816   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1817   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1818   return CkMemCheckPT::inRestarting;
1819 #else
1820   return 0;
1821 #endif
1822 }
1823
1824 extern "C"
1825 int CkReplicaAlive()
1826 {
1827         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
1828           CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1829         return CkMemCheckPT::replicaAlive;
1830
1831         /*if(CkMemCheckPT::replicaDead==1)
1832                 return 0;
1833         else            
1834                 return 1;*/
1835 }
1836
1837 extern "C"
1838 int CkInCheckpointing()
1839 {
1840         return CkMemCheckPT::inCheckpointing;
1841 }
1842
1843 extern "C"
1844 void CkSetInLdb(){
1845 #if CMK_MEM_CHECKPOINT
1846         CkMemCheckPT::inLoadbalancing = 1;
1847 #endif
1848 }
1849
1850 extern "C"
1851 int CkInLdb(){
1852 #if CMK_MEM_CHECKPOINT
1853         return CkMemCheckPT::inLoadbalancing;
1854 #endif
1855         return 0;
1856 }
1857
1858 extern "C"
1859 void CkResetInLdb(){
1860 #if CMK_MEM_CHECKPOINT
1861         CkMemCheckPT::inLoadbalancing = 0;
1862 #endif
1863 }
1864
1865 /*****************************************************************************
1866                 module initialization
1867 *****************************************************************************/
1868
1869 static int arg_where = CkCheckPoint_inMEM;
1870
1871 #if CMK_MEM_CHECKPOINT
1872 void init_memcheckpt(char **argv)
1873 {
1874     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1875       arg_where = CkCheckPoint_inDISK;
1876     }
1877
1878         // initiliazing _crashedNode variable
1879         CpvInitialize(int, _crashedNode);
1880         CpvInitialize(int, _remoteCrashedNode);
1881         CpvAccess(_crashedNode) = -1;
1882         CpvAccess(_remoteCrashedNode) = -1;
1883 }
1884 #endif
1885
1886 class CkMemCheckPTInit: public Chare {
1887 public:
1888   CkMemCheckPTInit(CkArgMsg *m) {
1889 #if CMK_MEM_CHECKPOINT
1890     if (arg_where == CkCheckPoint_inDISK) {
1891       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1892     }
1893     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1894     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1895 #endif
1896   }
1897 };
1898
1899 static void notifyHandler(char *msg)
1900 {
1901 #if CMK_MEM_CHECKPOINT
1902   CmiFree(msg);
1903       /* immediately increase restart phase to filter old messages */
1904   CpvAccess(_curRestartPhase) ++;
1905   CpvAccess(_qd)->flushStates();
1906   _discard_charm_message();
1907
1908 #endif
1909 }
1910
1911 extern "C"
1912 void notify_crash(int node)
1913 {
1914 #ifdef CMK_MEM_CHECKPOINT
1915   CpvAccess( _crashedNode) = node;
1916 #ifdef CMK_SMP
1917   for(int i=0;i<CkMyNodeSize();i++){
1918         CpvAccessOther(_crashedNode,i)=node;
1919   }
1920 #endif
1921   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1922   CkMemCheckPT::inRestarting = 1;
1923
1924     // this may be in interrupt handler, send a message to reset QD
1925   int pe = CmiNodeFirst(CkMyNode());
1926   for(int i=0;i<CkMyNodeSize();i++){
1927         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1928         CmiSetHandler(msg, notifyHandlerIdx);
1929         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1930   }
1931 #endif
1932 }
1933
1934 extern "C" void (*notify_crash_fn)(int node);
1935
1936
1937 #if CMK_CONVERSE_MPI
1938 void buddyDieHandler(char *msg)
1939 {
1940 #if CMK_MEM_CHECKPOINT
1941    // notify
1942         CkMemCheckPT::inRestarting = 1;
1943    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1944    notify_crash(diepe);
1945    // send message to crash pe to let it restart
1946    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1947    CkPrintf("[%d] finding newrank\n",CkMyPe());
1948    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
1949    CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
1950    int buddy = obj->BuddyPE(CmiMyPe());
1951    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
1952    if (buddy == diepe)  {
1953      mpi_restart_crashed(diepe, newrank);
1954    }
1955 #endif
1956 }
1957
1958 void pingHandler(void *msg)
1959 {
1960   lastPingTime = CmiWallTimer();
1961   CmiFree(msg);
1962 }
1963
1964 void pingCheckHandler()
1965 {
1966 #if CMK_MEM_CHECKPOINT
1967   double now = CmiWallTimer();
1968   if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
1969   //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
1970     int i, pe, buddy;
1971     // tell everyone the buddy dies
1972     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1973     for (i = 1; i < CmiNumPes(); i++) {
1974        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1975        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1976     }
1977     buddy = pe;
1978     CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
1979     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
1980       if (obj->isFailed(pe) || pe == buddy) continue;
1981       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1982       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1983       CmiSetHandler(msg, buddyDieHandlerIdx);
1984       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1985     }*/
1986     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1987     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1988     CmiSetHandler(msg, buddyDieHandlerIdx);
1989     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1990     //send to everyone in the other world
1991         if(CmiNumPartition()!=1){
1992                 for(int i=0;i<CmiNumPes();i++){
1993                   char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1994                   *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
1995                   CmiSetHandler(rMsg, replicaDieHandlerIdx);
1996                   CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
1997                 }
1998         }
1999   }
2000   else 
2001     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2002 #endif
2003 }
2004
2005 void pingBuddy()
2006 {
2007 #if CMK_MEM_CHECKPOINT
2008   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2009   if (obj) {
2010     int buddy = obj->BuddyPE(CkMyPe());
2011     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2012     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2013     CmiSetHandler(msg, pingHandlerIdx);
2014     CmiGetRestartPhase(msg) = 9999;
2015     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2016   }
2017   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2018 #endif
2019 }
2020 #endif
2021
2022 // initproc
2023 void CkRegisterRestartHandler( )
2024 {
2025 #if CMK_MEM_CHECKPOINT
2026   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2027   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2028   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2029   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2030   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2031   
2032   //for replica
2033   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2034   replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2035   replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2036   replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2037   askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2038   recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2039   recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2040   recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2041
2042 #if CMK_CONVERSE_MPI
2043   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2044   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2045   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2046 #endif
2047
2048   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2049   CpvInitialize(CkCheckPTMessage **, chkpBuf);
2050   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2051   CpvInitialize(CkCheckPTMessage *, buddyBuf);
2052   CpvInitialize(int,curPointer);
2053   CpvInitialize(int,recvdLocal);
2054   CpvInitialize(int,recvdRemote);
2055   CpvInitialize(int,recvdProcChkp);
2056   CpvInitialize(int,recvdArrayChkp);
2057
2058   CpvAccess(procChkptBuf) = NULL;
2059   CpvAccess(buddyBuf) = NULL;
2060   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2061   CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2062   CpvAccess(chkpBuf)[0] = NULL;
2063   CpvAccess(chkpBuf)[1] = NULL;
2064   CpvAccess(localProcChkpBuf)[0] = NULL;
2065   CpvAccess(localProcChkpBuf)[1] = NULL;
2066   
2067   CpvAccess(curPointer) = 0;
2068   CpvAccess(recvdLocal) = 0;
2069   CpvAccess(recvdRemote) = 0;
2070   CpvAccess(recvdProcChkp) = 0;
2071   CpvAccess(recvdArrayChkp) = 0;
2072
2073   notify_crash_fn = notify_crash;
2074
2075 #if ! CMK_CONVERSE_MPI
2076   // print pid to kill
2077 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2078 //  sleep(4);
2079 #endif
2080 #endif
2081 }
2082
2083
2084 extern "C"
2085 int CkHasCheckpoints()
2086 {
2087   return checkpointed;
2088 }
2089
2090 /// @todo: the following definitions should be moved to a separate file containing
2091 // structures and functions about fault tolerance strategies
2092
2093 /**
2094  *  * @brief: function for killing a process                                             
2095  *   */
2096 #ifdef CMK_MEM_CHECKPOINT
2097 #if CMK_HAS_GETPID
2098 void killLocal(void *_dummy,double curWallTime){
2099         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2100         if(CmiWallTimer()<killTime-1){
2101                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2102         }else{ 
2103 #if CMK_CONVERSE_MPI
2104                                 CkDieNow();
2105 #else 
2106                 kill(getpid(),SIGKILL);                                               
2107 #endif
2108         }              
2109
2110 #else
2111 void killLocal(void *_dummy,double curWallTime){
2112   CmiAbort("kill() not supported!");
2113 }
2114 #endif
2115 #endif
2116
2117 #ifdef CMK_MEM_CHECKPOINT
2118 /**
2119  * @brief: reads the file with the kill information
2120  */
2121 void readKillFile(){
2122         FILE *fp=fopen(killFile,"r");
2123         if(!fp){
2124                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2125                 return;
2126         }
2127         int proc;
2128         double sec;
2129         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2130                 if(proc == CkMyNode() && CkMyRank() == 0){
2131                         killTime = CmiWallTimer()+sec;
2132                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2133                         CcdCallFnAfter(killLocal,NULL,sec*1000);
2134                 }
2135         }
2136         fclose(fp);
2137 }
2138
2139 #if ! CMK_CONVERSE_MPI
2140 void CkDieNow()
2141 {
2142 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2143          // ignored for non-mpi version
2144         CmiPrintf("[%d] die now.\n", CmiMyPe());
2145         killTime = CmiWallTimer()+0.001;
2146         CcdCallFnAfter(killLocal,NULL,1);
2147 #endif
2148 }
2149 #endif
2150
2151 #endif
2152
2153 #include "CkMemCheckpoint.def.h"
2154
2155