fix group reduction after restart.
authorGengbin Zheng <gzheng@illinois.edu>
Sun, 13 Nov 2011 03:25:19 +0000 (21:25 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Sun, 13 Nov 2011 03:25:19 +0000 (21:25 -0600)
Using group reduction instead of all PEs sending to PE 0 to finish a checkpointing.

src/ck-core/ckarray.h
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckreduction.C
src/ck-core/ckreduction.h

index 07ec971f31c43bbcebf74b24a1b408054a807e6f..a951572b9c7be6df4301761d2168539d77553dec 100644 (file)
@@ -737,7 +737,7 @@ private:
   CkArrayReducer *reducer; //Read-only copy of default reducer
   CkArrayBroadcaster *broadcaster; //Read-only copy of default broadcaster
 public:
-  void flushStates() { CkReductionMgr::flushStates(); CK_ARRAYLISTENER_LOOP(listeners, l->flushState()); }
+  void flushStates() { CkReductionMgr::flushStates(0); CK_ARRAYLISTENER_LOOP(listeners, l->flushState()); }
 #if (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
        // the mlogft only support 1D arrays, then returning the number of elements in the first dimension
        virtual int numberReductionMessages(){CkAssert(CkMyPe() == 0);return numInitial.data()[0];}
index 613e3131e643bddd14e72718d5ae8e1850b0e411..6d82761293cce97a5d2d743991c7ec648bd588b9 100644 (file)
@@ -332,8 +332,9 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|ckCheckPTGroupID;          // recover global variable
   p|cpCallback;                        // store callback
   p|where;                     // where to checkpoint
+  p|peCount;
   if (p.isUnpacking()) {
-    recvCount = peCount = 0;
+    recvCount = 0;
 #if CMK_CONVERSE_MPI
     void pingBuddy();
     void pingCheckHandler();
@@ -463,7 +464,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
   }
     // if my table is empty, then I am done
-  if (len == 0) thisProxy[cpStarter].cpFinish();
+  if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
 
   // pack and send proc level data
   sendProcData();
@@ -517,7 +518,7 @@ void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
   CpvAccess(procChkptBuf) = msg;
   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
-  thisProxy[msg->reportPe].cpFinish();
+  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
 }
 
 // ArrayElement call this function to give us the checkpointed data
@@ -538,7 +539,7 @@ void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
     recvCount ++;
     if (recvCount == ckTable.length()) {
       if (where == CkCheckPoint_inMEM) {
-        thisProxy[cpStarter].cpFinish();
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
       }
       else if (where == CkCheckPoint_inDISK) {
         // another barrier for finalize the writing using fsync
@@ -559,7 +560,7 @@ void CkMemCheckPT::syncFiles(CkReductionMsg *m)
 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
   system("sync");
 #endif
-  thisProxy[cpStarter].cpFinish();
+  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
 }
 
 // only is called on cpStarter when checkpoint is done
@@ -568,7 +569,7 @@ void CkMemCheckPT::cpFinish()
   CmiAssert(CkMyPe() == cpStarter);
   peCount++;
     // now that all processors have finished, activate callback
-  if (peCount == 2*(CkNumPes())) {
+  if (peCount == 2) {
     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
     cpCallback.send();
     peCount = 0;
@@ -909,8 +910,8 @@ void CkMemCheckPT::recoverArrayElements()
   CKLOCMGR_LOOP(mgr->doneInserting(););
 
   inRestarting = 0;
- // _crashedNode = -1;
-CpvAccess(_crashedNode) = -1;
 // _crashedNode = -1;
+  CpvAccess(_crashedNode) = -1;
 
   if (CkMyPe() == 0)
     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
index a07e72db2e3f0d56df5317c5ea7cf122e6bfe049..f95f576df68fc20c9ac32d469ccac80997e20e76 100644 (file)
@@ -21,7 +21,7 @@ module CkMemCheckpoint {
        entry void gotData();
        entry void recvProcData(CkProcCheckPTMessage *);
        entry void syncFiles(CkReductionMsg *);
-       entry void cpFinish();
+       entry [reductiontarget] void cpFinish();
        entry void report();
        // restart
         entry [expedited] void restart(int);
index d0d420492fd534f581aa4e58cc1c8c67e8d10d3c..c09fea060a4402e12d737d3cb0e7bb70772f191e 100644 (file)
@@ -221,14 +221,14 @@ CkReductionMgr::CkReductionMgr(CkMigrateMessage *m) :CkGroupInitCallback(m)
   DEBR((AA"In reductionMgr migratable constructor at %d \n"AB,this));
 }
 
-void CkReductionMgr::flushStates()
+void CkReductionMgr::flushStates(int isgroup)
 {
   // CmiPrintf("[%d] CkReductionMgr::flushState\n", CkMyPe());
   redNo=0;
   completedRedNo = -1;
   inProgress=CmiFalse;
   creating=CmiFalse;
-  gcount=lcount=0;
+  if (!isgroup) gcount=lcount=0;    // array reduction group needs to reset to 0
   startRequested=CmiFalse;
   nContrib=nRemote=0;
   maxStartRequest=0;
index f732547ef4fc609f558ee0bbe4453e60bc2c70fe..6968464b564148979db04c5c0c50254ac792857a 100644 (file)
@@ -149,7 +149,7 @@ public:
        void endArrayReduction();
 
        virtual CmiBool isReductionMgr(void){ return CmiTrue; }
-       virtual void flushStates();
+       virtual void flushStates(int isgroup);
        /*FAULT_EVAC: used to get the gcount on a processor when 
                it is evacuated.
                TODO: It needs to be fixed as it should return the gcount
@@ -414,7 +414,7 @@ class Group : public CkReductionMgr
        virtual int isNodeGroup() { return 0; }
        virtual void pup(PUP::er &p);
        virtual void flushStates() {
-               CkReductionMgr::flushStates();
+               CkReductionMgr::flushStates(1);
                reductionInfo.redNo = 0;
        }
        virtual void CkAddThreadListeners(CthThread tid, void *msg);