optimize chkp after ldb
authorXiang Ni <xiangni2@illinois.edu>
Thu, 18 Oct 2012 07:33:24 +0000 (02:33 -0500)
committerXiang Ni <xiangni2@illinois.edu>
Thu, 18 Oct 2012 07:33:24 +0000 (02:33 -0500)
src/ck-core/ckarray.h
src/ck-core/ckcheckpoint.C
src/ck-core/ckcheckpoint.h
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index 995e6204d6cc8c3455a2dacccfc451b25974a235..a09d07834d2a726a19447be6f17b5c8d956207a7 100644 (file)
@@ -482,6 +482,7 @@ void CkBroadcastMsgSection(int entryIndex, void *msg, CkSectionID sID, int opts=
 class ArrayElement : public CkMigratable
 {
   friend class CkArray;
+
   friend class CkArrayListener;
   int numInitialElements; // Number of elements created by ckNew(numElements)
   void initBasics(void);
@@ -544,6 +545,7 @@ private:
 
 #if CMK_MEM_CHECKPOINT
 friend class CkMemCheckPT;
+friend class CkLocMgr;
 protected:
   int budPEs[2];
 private:
index 89402fd4aff9c64316b8e8074efcbf879c4f0d97..e3b513b96cdfb1ce847f63bb582b36c098e86f54 100644 (file)
@@ -542,7 +542,7 @@ void CkPupArrayElementsData(PUP::er &p, int notifyListeners)
        }
 }
 
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) ||CMK_MEM_CHECKPOINT
 int  CkCountArrayElements(){
     int numGroups = CkpvAccess(_groupIDTable)->size();
     int i;
index 7c8203a93b55c7a23affc005051668213060dfb8..6f210777d8738e227d5e735d6c353f45fac1e267 100644 (file)
@@ -74,7 +74,7 @@ void CkRemoveArrayElements();
 
 void CkStartCheckpoint(const char* dirname,const CkCallback& cb);
 void CkRestartMain(const char* dirname, CkArgMsg *args);
-#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
+#if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_)) || CMK_MEM_CHECKPOINT
 int  CkCountArrayElements();
 #endif
 
index c33807bab89b9e469e81b8b42ac708b42b2d4927..bb77ad6b882cd2b525f86f1642cf8cd50cb3fc78 100644 (file)
@@ -2627,6 +2627,24 @@ void CkLocMgr::pupElementsFor(PUP::er &p,CkLocRec_local *rec,
 #endif
                 }
        }
+#if CMK_MEM_CHECKPOINT
+       if(CkInRestarting()){
+         ArrayElement *elt;
+         CkVec<CkMigratable *> list;
+         migratableList(rec, list);
+         CmiAssert(list.length() > 0);
+         for (int l=0; l<list.length(); l++) {
+               //    reset, may not needed now
+               // for now.
+               for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
+                       ArrayElement * elt = (ArrayElement *)list[l];
+                 contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
+                 if (c) c->redNo = 0;
+               }
+         }
+               
+       }
+#endif
 }
 #endif
 
index 00076bfde4cbf66b0844f27429a803ae6a4b62be..fbb3c168ff7308c6beb783b9549733a607222cab 100644 (file)
@@ -575,6 +575,8 @@ public:
                if (managers.find(arrayID)->mgr==NULL)
                        CkAbort("CkLocMgr::lookupLocal called for unknown array!\n");
 #endif
+               if (managers.find(arrayID)->mgr==NULL)
+                       CkAbort("CkLocMgr::lookupLocal called for unknown array!\n");
                return managers.find(arrayID)->elts.get(localIdx);
        }
 
@@ -660,6 +662,7 @@ private:
 
        friend class CkLocation; //so it can call pupElementsFor
        friend class ArrayElement;
+       friend class MemElementPacker;
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
        void pupElementsFor(PUP::er &p,CkLocRec_local *rec,
         CkElementCreation_t type, CmiBool create=CmiTrue, int dummy=0);
index 072a5f6b0eaa2240ac163f17411734c17b9c9ac1..ee7881eda5fbb6bf210f51362715f4832c89eda9 100644 (file)
@@ -65,6 +65,7 @@ void noopck(const char*, ...)
 #define CK_NO_PROC_POOL                                0
 #endif
 
+#define CMK_CHKP_ALL           1
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 
@@ -92,6 +93,21 @@ int killFlag=0;
 double killTime=0.0;
 #endif
 
+#ifdef CKLOCMGR_LOOP
+#undef CKLOCMGR_LOOP
+#endif
+// loop over all CkLocMgr and do "code"
+#define  CKLOCMGR_LOOP(code)   {       \
+  int numGroups = CkpvAccess(_groupIDTable)->size();   \
+  for(int i=0;i<numGroups;i++) {       \
+    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
+    if(obj->isLocMgr())  {     \
+      CkLocMgr *mgr = (CkLocMgr*)obj;  \
+      code     \
+    }  \
+  }    \
+ }
+
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
 
@@ -430,12 +446,17 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index,
 
 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 {
+#if !CMK_CHKP_ALL      
   int buddy = msg->bud1;
   if (buddy == CkMyPe()) buddy = msg->bud2;
   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
   recvData(msg);
     // ack
   thisProxy[buddy].gotData();
+#else
+  recvArrayCheckpoint(msg);
+  thisProxy[msg->bud2].gotData();
+#endif
 }
 
 // loop through my checkpoint table and ask checkpointed array elements
@@ -449,7 +470,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
     startTime = CmiWallTimer();
     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
   }
-
+#if !CMK_CHKP_ALL
   int len = ckTable.length();
   for (int i=0; i<len; i++) {
     CkCheckPTInfo *entry = ckTable[i];
@@ -464,11 +485,97 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
   }
     // if my table is empty, then I am done
   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-
+#else
+  startArrayCheckpoint();
+#endif
   // pack and send proc level data
   sendProcData();
 }
 
+class MemElementPacker : public CkLocIterator{
+       private:
+               CkLocMgr *locMgr;
+               PUP::er &p;
+       public:
+               MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
+               void addLocation(CkLocation &loc){
+                       CkArrayIndexMax idx = loc.getIndex();
+                       CkGroupID gID = locMgr->ckGetGroupID();
+                       ArrayElement *elt = (ArrayElement *)loc.getLocalRecord();
+                       CmiAssert(elt);
+                       //elt = (ArrayElement *)locMgr->lookup(idx, aid);
+                       p|gID;
+                       p|idx;
+                       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+               }
+};
+
+void CkMemCheckPT::pupAllElements(PUP::er &p){
+#if CMK_CHKP_ALL
+       int numElements;
+       if(!p.isUnpacking()){
+               numElements = CkCountArrayElements();
+       }
+       p | numElements;
+       if(!p.isUnpacking()){
+               CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
+       }
+#endif
+}
+
+void CkMemCheckPT::startArrayCheckpoint(){
+#if CMK_CHKP_ALL
+       int size;
+       {
+               PUP::sizer psizer;
+               pupAllElements(psizer);
+               size = psizer.size();
+       }
+       int packSize = size/sizeof(double)+1;
+       CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
+       msg->len = size;
+       msg->cp_flag = 1;
+       int budPEs[2];
+       msg->bud1=CkMyPe();
+       msg->bud2=ChkptOnPe(CkMyPe());
+       {
+               PUP::toMem p(msg->packData);
+               pupAllElements(p);
+       }
+       thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
+       chkpTable[0] = msg;
+       recvCount++;
+#endif
+}
+
+void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
+{
+#if CMK_CHKP_ALL
+       int idx = 1;
+       if(msg->bud1 == CkMyPe()){
+               idx = 0;
+       }
+       int isChkpting = msg->cp_flag;
+       chkpTable[idx] = msg;
+       if(isChkpting){
+               recvCount++;
+               if(recvCount == 2){
+                 if (where == CkCheckPoint_inMEM) {
+                       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+                 }
+                 else if (where == CkCheckPoint_inDISK) {
+                       // another barrier for finalize the writing using fsync
+                       CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+                       contribute(0,NULL,CkReduction::sum_int,localcb);
+                 }
+                 else
+                       CmiAbort("Unknown checkpoint scheme");
+                 recvCount = 0;
+               }
+       }
+#endif
+}
+
 // don't handle array elements
 static inline void _handleProcData(PUP::er &p)
 {
@@ -572,7 +679,9 @@ void CkMemCheckPT::cpFinish()
     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
     cpCallback.send();
     peCount = 0;
+#if !CMK_CHKP_ALL
     thisProxy.report();
+#endif
   }
 }
 
@@ -632,21 +741,7 @@ inline int CkMemCheckPT::isMaster(int buddype)
 #endif
 }
 
-#ifdef CKLOCMGR_LOOP
-#undef CKLOCMGR_LOOP
-#endif
 
-// loop over all CkLocMgr and do "code"
-#define  CKLOCMGR_LOOP(code)   {       \
-  int numGroups = CkpvAccess(_groupIDTable)->size();   \
-  for(int i=0;i<numGroups;i++) {       \
-    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
-    if(obj->isLocMgr())  {     \
-      CkLocMgr *mgr = (CkLocMgr*)obj;  \
-      code     \
-    }  \
-  }    \
- }
 
 #if 0
 // helper class to pup all elements that belong to same ckLocMgr
@@ -793,38 +888,42 @@ void CkMemCheckPT::recoverBuddies()
 
   // recover buddies
   expectCount = 0;
+#if !CMK_CHKP_ALL
   for (idx=0; idx<len; idx++) {
     CkCheckPTInfo *entry = ckTable[idx];
     if (entry->pNo == thisFailedPe) {
 #if CK_NO_PROC_POOL
       // find a new buddy
-/*
-      int budPe = CkMyPe();
-//      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
-      while (budPe == CkMyPe() || isFailed(budPe)) 
-          budPe = (budPe+1)%CkNumPes();
-*/
       int budPe = BuddyPE(CkMyPe());
-      //entry->pNo = budPe;
 #else
       int budPe = thisFailedPe;
 #endif
-      entry->updateBuddy(CkMyPe(), budPe);
-#if 0
-      thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
-      CkArrayCheckPTMessage *msg = entry->getCopy();
-      msg->cp_flag = 0;            // not checkpointing
-      thisProxy[budPe].recvData(msg);
-#else
       CkArrayCheckPTMessage *msg = entry->getCopy();
       msg->bud1 = budPe;
       msg->bud2 = CkMyPe();
       msg->cp_flag = 0;            // not checkpointing
       thisProxy[budPe].recoverEntry(msg);
-#endif
       expectCount ++;
     }
   }
+#else
+  //send to failed pe
+  if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
+#if CK_NO_PROC_POOL
+      // find a new buddy
+      int budPe = BuddyPE(CkMyPe());
+#else
+      int budPe = thisFailedPe;
+#endif
+      CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
+      CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
+         msg->cp_flag = 0;            // not checkpointing
+      msg->bud1 = budPe;
+      msg->bud2 = CkMyPe();
+      thisProxy[budPe].recoverEntry(msg);
+      expectCount ++;
+  }
+#endif
 
 #if 1
   if (expectCount == 0) {
@@ -876,6 +975,7 @@ void CkMemCheckPT::recoverArrayElements()
   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
 #endif
+#if !CMK_CHKP_ALL
   for (int idx=0; idx<len; idx++)
   {
     CkCheckPTInfo *entry = ckTable[idx];
@@ -906,6 +1006,11 @@ void CkMemCheckPT::recoverArrayElements()
     CkFreeMsg(msg);
     count ++;
   }
+#else
+       CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+       recoverAll(msg,gmap,imap);
+    CkFreeMsg(msg);
+#endif
 #if STREAMING_INFORMHOME
   for (int i=0; i<CkNumPes(); i++) {
     if (gmap[i].size() && i!=CkMyPe()) {
@@ -927,6 +1032,49 @@ void CkMemCheckPT::recoverArrayElements()
     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
 }
 
+void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
+#if CMK_CHKP_ALL
+       PUP::fromMem p(msg->packData);
+       int numElements;
+       p|numElements;
+       if(p.isUnpacking()){
+               for(int i=0;i<numElements;i++){
+                       CkGroupID gID;
+                       CkArrayIndex idx;
+                       p|gID;
+                       p|idx;
+                       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+#if STREAMING_INFORMHOME
+                       mgr->resume(idx,p,CmiFalse);
+#else
+                       mgr->resume(idx,p,CmiTrue);
+#endif
+                         /*CkLocRec_local *rec = loc.getLocalRecord();
+                         CmiAssert(rec);
+                         CkVec<CkMigratable *> list;
+                         mgr->migratableList(rec, list);
+                         CmiAssert(list.length() > 0);
+                         for (int l=0; l<list.length(); l++) {
+                               ArrayElement * elt = (ArrayElement *)list[l];
+                               //    reset, may not needed now
+                               // for now.
+                               for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
+                                 contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
+                                 if (c) c->redNo = 0;
+                               }
+                         }*/
+#if STREAMING_INFORMHOME
+                       int homePe = mgr->homePe(idx);
+                       if (homePe != CkMyPe()) {
+                         gmap[homePe].push_back(gID);
+                         imap[homePe].push_back(idx);
+                       }
+#endif
+               }
+       }
+#endif
+}
+
 static double restartT;
 
 // on every processor
index ad8b3e1f6b80f09a7b57bba28bfacd106b93c91e..742c89a5fc54841f9999d9b83b6e29f86064ca22 100644 (file)
@@ -18,6 +18,7 @@ module CkMemCheckpoint {
        // checkpointing
         entry [expedited] void doItNow(int spe, CkCallback &);  //checkpointing
        entry void recvData(CkArrayCheckPTMessage *);
+       entry void recvArrayCheckpoint(CkArrayCheckPTMessage *);
        entry void gotData();
        entry void recvProcData(CkProcCheckPTMessage *);
        entry void syncFiles(CkReductionMsg *);
index bc5dcdd3320c5bc23775bbd4cc28d2a4631d32b6..3829aab5a8aae791e91c6233a6c549ff4a37f668 100644 (file)
@@ -21,6 +21,14 @@ public:
        int cp_flag;          // 1: from checkpoint 0: from recover
 };
 
+class CkCheckPTMessage: public CMessage_CkArrayCheckPTMessage {
+public:
+       double *packData;
+       int bud1, bud2;
+       int len;
+       int cp_flag;          // 1: from checkpoint 0: from recover
+};
+
 class CkProcCheckPTMessage: public CMessage_CkProcCheckPTMessage {
 public:
        int pe;
@@ -81,6 +89,10 @@ public:
   void updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe);
   void resetLB(int diepe);
   int  isFailed(int pe);
+  void pupAllElements(PUP::er &p);
+  void startArrayCheckpoint();
+  void recvArrayCheckpoint(CkArrayCheckPTMessage *m);
+  void recoverAll(CkArrayCheckPTMessage * msg, CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap);
 public:
   static CkCallback  cpCallback;
 
@@ -89,6 +101,7 @@ public:
   static char*  stage;
 private:
   CkVec<CkCheckPTInfo *> ckTable;
+  CkArrayCheckPTMessage * chkpTable[2];
 
   int recvCount, peCount;
   int expectCount, ackCount;