optimize chkp after ldb
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 072a5f6b0eaa2240ac163f17411734c17b9c9ac1..ee7881eda5fbb6bf210f51362715f4832c89eda9 100644 (file)
@@ -65,6 +65,7 @@ void noopck(const char*, ...)
 #define CK_NO_PROC_POOL                                0
 #endif
 
+#define CMK_CHKP_ALL           1
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 
@@ -92,6 +93,21 @@ int killFlag=0;
 double killTime=0.0;
 #endif
 
+#ifdef CKLOCMGR_LOOP
+#undef CKLOCMGR_LOOP
+#endif
+// loop over all CkLocMgr and do "code"
+#define  CKLOCMGR_LOOP(code)   {       \
+  int numGroups = CkpvAccess(_groupIDTable)->size();   \
+  for(int i=0;i<numGroups;i++) {       \
+    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
+    if(obj->isLocMgr())  {     \
+      CkLocMgr *mgr = (CkLocMgr*)obj;  \
+      code     \
+    }  \
+  }    \
+ }
+
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
 
@@ -430,12 +446,17 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index,
 
 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 {
+#if !CMK_CHKP_ALL      
   int buddy = msg->bud1;
   if (buddy == CkMyPe()) buddy = msg->bud2;
   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
   recvData(msg);
     // ack
   thisProxy[buddy].gotData();
+#else
+  recvArrayCheckpoint(msg);
+  thisProxy[msg->bud2].gotData();
+#endif
 }
 
 // loop through my checkpoint table and ask checkpointed array elements
@@ -449,7 +470,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
     startTime = CmiWallTimer();
     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
   }
-
+#if !CMK_CHKP_ALL
   int len = ckTable.length();
   for (int i=0; i<len; i++) {
     CkCheckPTInfo *entry = ckTable[i];
@@ -464,11 +485,97 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
   }
     // if my table is empty, then I am done
   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-
+#else
+  startArrayCheckpoint();
+#endif
   // pack and send proc level data
   sendProcData();
 }
 
+class MemElementPacker : public CkLocIterator{
+       private:
+               CkLocMgr *locMgr;
+               PUP::er &p;
+       public:
+               MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
+               void addLocation(CkLocation &loc){
+                       CkArrayIndexMax idx = loc.getIndex();
+                       CkGroupID gID = locMgr->ckGetGroupID();
+                       ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
+                       CmiAssert(elt);
+                       //elt = (ArrayElement *)locMgr->lookup(idx, aid);
+                       p|gID;
+                       p|idx;
+                       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+               }
+};
+
+void CkMemCheckPT::pupAllElements(PUP::er &p){
+#if CMK_CHKP_ALL
+       int numElements;
+       if(!p.isUnpacking()){
+               numElements = CkCountArrayElements();
+       }
+       p | numElements;
+       if(!p.isUnpacking()){
+               CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
+       }
+#endif
+}
+
+void CkMemCheckPT::startArrayCheckpoint(){
+#if CMK_CHKP_ALL
+       int size;
+       {
+               PUP::sizer psizer;
+               pupAllElements(psizer);
+               size = psizer.size();
+       }
+       int packSize = size/sizeof(double)+1;
+       CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
+       msg->len = size;
+       msg->cp_flag = 1;
+       int budPEs[2];
+       msg->bud1=CkMyPe();
+       msg->bud2=ChkptOnPe(CkMyPe());
+       {
+               PUP::toMem p(msg->packData);
+               pupAllElements(p);
+       }
+       thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
+       chkpTable[0] = msg;
+       recvCount++;
+#endif
+}
+
+void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
+{
+#if CMK_CHKP_ALL
+       int idx = 1;
+       if(msg->bud1 == CkMyPe()){
+               idx = 0;
+       }
+       int isChkpting = msg->cp_flag;
+       chkpTable[idx] = msg;
+       if(isChkpting){
+               recvCount++;
+               if(recvCount == 2){
+                 if (where == CkCheckPoint_inMEM) {
+                       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+                 }
+                 else if (where == CkCheckPoint_inDISK) {
+                       // another barrier for finalize the writing using fsync
+                       CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+                       contribute(0,NULL,CkReduction::sum_int,localcb);
+                 }
+                 else
+                       CmiAbort("Unknown checkpoint scheme");
+                 recvCount = 0;
+               }
+       }
+#endif
+}
+
 // don't handle array elements
 static inline void _handleProcData(PUP::er &p)
 {
@@ -572,7 +679,9 @@ void CkMemCheckPT::cpFinish()
     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
     cpCallback.send();
     peCount = 0;
+#if !CMK_CHKP_ALL
     thisProxy.report();
+#endif
   }
 }
 
@@ -632,21 +741,7 @@ inline int CkMemCheckPT::isMaster(int buddype)
 #endif
 }
 
-#ifdef CKLOCMGR_LOOP
-#undef CKLOCMGR_LOOP
-#endif
 
-// loop over all CkLocMgr and do "code"
-#define  CKLOCMGR_LOOP(code)   {       \
-  int numGroups = CkpvAccess(_groupIDTable)->size();   \
-  for(int i=0;i<numGroups;i++) {       \
-    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
-    if(obj->isLocMgr())  {     \
-      CkLocMgr *mgr = (CkLocMgr*)obj;  \
-      code     \
-    }  \
-  }    \
- }
 
 #if 0
 // helper class to pup all elements that belong to same ckLocMgr
@@ -793,38 +888,42 @@ void CkMemCheckPT::recoverBuddies()
 
   // recover buddies
   expectCount = 0;
+#if !CMK_CHKP_ALL
   for (idx=0; idx<len; idx++) {
     CkCheckPTInfo *entry = ckTable[idx];
     if (entry->pNo == thisFailedPe) {
 #if CK_NO_PROC_POOL
       // find a new buddy
-/*
-      int budPe = CkMyPe();
-//      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
-      while (budPe == CkMyPe() || isFailed(budPe)) 
-          budPe = (budPe+1)%CkNumPes();
-*/
       int budPe = BuddyPE(CkMyPe());
-      //entry->pNo = budPe;
 #else
       int budPe = thisFailedPe;
 #endif
-      entry->updateBuddy(CkMyPe(), budPe);
-#if 0
-      thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
-      CkArrayCheckPTMessage *msg = entry->getCopy();
-      msg->cp_flag = 0;            // not checkpointing
-      thisProxy[budPe].recvData(msg);
-#else
       CkArrayCheckPTMessage *msg = entry->getCopy();
       msg->bud1 = budPe;
       msg->bud2 = CkMyPe();
       msg->cp_flag = 0;            // not checkpointing
       thisProxy[budPe].recoverEntry(msg);
-#endif
       expectCount ++;
     }
   }
+#else
+  //send to failed pe
+  if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
+#if CK_NO_PROC_POOL
+      // find a new buddy
+      int budPe = BuddyPE(CkMyPe());
+#else
+      int budPe = thisFailedPe;
+#endif
+      CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
+      CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
+         msg->cp_flag = 0;            // not checkpointing
+      msg->bud1 = budPe;
+      msg->bud2 = CkMyPe();
+      thisProxy[budPe].recoverEntry(msg);
+      expectCount ++;
+  }
+#endif
 
 #if 1
   if (expectCount == 0) {
@@ -876,6 +975,7 @@ void CkMemCheckPT::recoverArrayElements()
   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
 #endif
+#if !CMK_CHKP_ALL
   for (int idx=0; idx<len; idx++)
   {
     CkCheckPTInfo *entry = ckTable[idx];
@@ -906,6 +1006,11 @@ void CkMemCheckPT::recoverArrayElements()
     CkFreeMsg(msg);
     count ++;
   }
+#else
+       CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+       recoverAll(msg,gmap,imap);
+    CkFreeMsg(msg);
+#endif
 #if STREAMING_INFORMHOME
   for (int i=0; i<CkNumPes(); i++) {
     if (gmap[i].size() && i!=CkMyPe()) {
@@ -927,6 +1032,49 @@ void CkMemCheckPT::recoverArrayElements()
     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
 }
 
+void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
+#if CMK_CHKP_ALL
+       PUP::fromMem p(msg->packData);
+       int numElements;
+       p|numElements;
+       if(p.isUnpacking()){
+               for(int i=0;i<numElements;i++){
+                       CkGroupID gID;
+                       CkArrayIndex idx;
+                       p|gID;
+                       p|idx;
+                       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+#if STREAMING_INFORMHOME
+                       mgr->resume(idx,p,CmiFalse);
+#else
+                       mgr->resume(idx,p,CmiTrue);
+#endif
+                         /*CkLocRec_local *rec = loc.getLocalRecord();
+                         CmiAssert(rec);
+                         CkVec<CkMigratable *> list;
+                         mgr->migratableList(rec, list);
+                         CmiAssert(list.length() > 0);
+                         for (int l=0; l<list.length(); l++) {
+                               ArrayElement * elt = (ArrayElement *)list[l];
+                               //    reset, may not needed now
+                               // for now.
+                               for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
+                                 contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
+                                 if (c) c->redNo = 0;
+                               }
+                         }*/
+#if STREAMING_INFORMHOME
+                       int homePe = mgr->homePe(idx);
+                       if (homePe != CkMyPe()) {
+                         gmap[homePe].push_back(gID);
+                         imap[homePe].push_back(idx);
+                       }
+#endif
+               }
+       }
+#endif
+}
+
 static double restartT;
 
 // on every processor