cleanm up flag xiang/optChkp
authorXiang Ni <xini@login2.intrepid.alcf.anl.gov>
Thu, 25 Oct 2012 20:00:49 +0000 (20:00 +0000)
committerXiang Ni <xini@login2.intrepid.alcf.anl.gov>
Thu, 25 Oct 2012 20:00:49 +0000 (20:00 +0000)
src/ck-core/cklocation.C
src/ck-core/cklocation.h
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.h

index 4134e9935e1c8770a6361113609a23b40d934add..64551958a0bd9eacbbe5898f45d14c66168d852b 100644 (file)
@@ -1994,14 +1994,30 @@ void CkLocMgr::staticSpringCleaning(void *forWhom,double curWallTime) {
        DEBK((AA"Starting spring cleaning at %.2f\n"AB,CkWallTimer()));
        ((CkLocMgr *)forWhom)->springCleaning();
 }
+//doesn't delete if there is extra pe
+void CkLocMgr::flushLocalRecs(void)
+{
+  void *objp;
+  void *keyp;
+  CkHashtableIterator *it=hash.iterator();
+  CmiImmediateLock(hashImmLock);
+  while (NULL!=(objp=it->next(&keyp))) {
+    CkLocRec *rec=*(CkLocRec **)objp;
+    CkArrayIndex &idx=*(CkArrayIndex *)keyp;
+    if (rec->type() == CkLocRec::local) {
+        callMethod((CkLocRec_local*)rec, &CkMigratable::ckDestroy);
+        it->seek(-1);//retry this hash slot
+    }
+  }
+  delete it;
+  CmiImmediateUnlock(hashImmLock);
+}
 
 // clean all buffer'ed messages and also free local objects
 void CkLocMgr::flushAllRecs(void)
 {
   void *objp;
   void *keyp;
-  int flag_local = 0;  
-  int flag_remote = 0;  
   CkHashtableIterator *it=hash.iterator();
   CmiImmediateLock(hashImmLock);
   while (NULL!=(objp=it->next(&keyp))) {
@@ -2013,17 +2029,14 @@ void CkLocMgr::flushAllRecs(void)
       //this condition
       
       if(_BgOutOfCoreFlag!=1){
-                 //TODO doesn't delete if there is actual pe
-                 //hash.remove(*(CkArrayIndex *)&idx);
-        //delete rec;
-        //it->seek(-1);//retry this hash slot
-        flag_remote++;
+       hash.remove(*(CkArrayIndex *)&idx);
+        delete rec;
+        it->seek(-1);//retry this hash slot
       }
     }
     else {
         callMethod((CkLocRec_local*)rec, &CkMigratable::ckDestroy);
         it->seek(-1);//retry this hash slot
-        flag_local++;
     }
   }
   delete it;
index 3db72e819d4fae7fd548900843119d59a0aa9d8f..f02601a91838542d663db8097864f67428c26d04 100644 (file)
@@ -670,6 +670,7 @@ public:
        void migratableList(CkLocRec_local *rec, CkVec<CkMigratable *> &list);
 
        void flushAllRecs(void);
+       void flushLocalRecs(void);
        void pup(PUP::er &p);
        
        //Look up array element in hash table.  Index out-of-bounds if not found.
index f2775937cb56c5aef60eaec7f92b4c763acbb454..ee0ac614a065d711921d81b54636062d5f6e8eef 100644 (file)
@@ -67,7 +67,9 @@ void noopck(const char*, ...)
 
 #define CMK_CHKP_ALL           1
 #define CMK_USE_BARRIER                1
-#define STREAMING_INFORMHOME                    0
+
+//stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
+#define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 
 // static, so that it is accessible from Converse part
@@ -370,10 +372,10 @@ void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m)
   PUP::fromMem p(m->packData);
   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
   CmiAssert(mgr);
-#if STREAMING_INFORMHOME
-  mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
-#else
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
+#else
+  mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
 #endif
 
   // find a list of array elements bound together
@@ -678,7 +680,8 @@ void CkMemCheckPT::cpFinish()
   CmiAssert(CkMyPe() == cpStarter);
   peCount++;
     // now that all processors have finished, activate callback
-  if (peCount == 2) {
+  if (peCount == 2) 
+{
     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
     cpCallback.send();
     peCount = 0;
@@ -845,8 +848,11 @@ void CkMemCheckPT::removeArrayElements()
 
   // get rid of all buffering and remote recs
   // including destorying all array elements
-  CKLOCMGR_LOOP(mgr->flushAllRecs(););
-
+#if CK_NO_PROC_POOL  
+       CKLOCMGR_LOOP(mgr->flushAllRecs(););
+#else
+       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+#endif
 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
 
   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
@@ -979,10 +985,12 @@ void CkMemCheckPT::recoverArrayElements()
  int flag = 0;
   // recover all array elements
   int count = 0;
-//#if STREAMING_INFORMHOME
+
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
-//#endif
+#endif
+
 #if !CMK_CHKP_ALL
   for (int idx=0; idx<len; idx++)
   {
@@ -1003,7 +1011,7 @@ void CkMemCheckPT::recoverArrayElements()
     // gzheng
     //thisProxy[CkMyPe()].inmem_restore(msg);
     inmem_restore(msg);
-#if STREAMING_INFORMHOME
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
     int homePe = mgr->homePe(msg->index);
     if (homePe != CkMyPe()) {
@@ -1016,36 +1024,42 @@ void CkMemCheckPT::recoverArrayElements()
   }
 #else
        CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
        recoverAll(msg,gmap,imap);
+#else
+       recoverAll(msg);
+#endif
     CkFreeMsg(msg);
 #endif
   curTime = CmiWallTimer();
   if (CkMyPe() == thisFailedPe)
        CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
-#if STREAMING_INFORMHOME
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
   for (int i=0; i<CkNumPes(); i++) {
     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
        flag++; 
          }
   }
-#endif
   delete [] imap;
   delete [] gmap;
+#endif
   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
 
   CKLOCMGR_LOOP(mgr->doneInserting(););
 
   // _crashedNode = -1;
   CpvAccess(_crashedNode) = -1;
-if(CkMyPe()!=thisFailedPe)
   inRestarting = 0;
- // if (CkMyPe() == 0)
-   // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
+  if (CkMyPe() == 0)
+    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+#else
 if(flag == 0)
 {
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
 }
+#endif
 }
 
 void CkMemCheckPT::gotReply(){
@@ -1065,26 +1079,12 @@ void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gma
                        p|idx;
                        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
                        int homePe = mgr->homePe(idx);
-#if STREAMING_INFORMHOME
-                       mgr->resume(idx,p,CmiFalse);
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
+                       mgr->resume(idx,p,CmiTrue);
 #else
-                               mgr->resume(idx,p,CmiFalse);
-#endif
-                         /*CkLocRec_local *rec = loc.getLocalRecord();
-                         CmiAssert(rec);
-                         CkVec<CkMigratable *> list;
-                         mgr->migratableList(rec, list);
-                         CmiAssert(list.length() > 0);
-                         for (int l=0; l<list.length(); l++) {
-                               ArrayElement * elt = (ArrayElement *)list[l];
-                               //    reset, may not needed now
-                               // for now.
-                               for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
-                                 contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
-                                 if (c) c->redNo = 0;
-                               }
-                         }*/
-#if STREAMING_INFORMHOME
+                       mgr->resume(idx,p,CmiFalse);
+#endif
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
                        homePe = mgr->homePe(idx);
                        if (homePe != CkMyPe()) {
                          gmap[homePe].push_back(gID);
@@ -1093,6 +1093,8 @@ void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gma
 #endif
                }
        }
+       if(CkMyPe()==thisFailedPe)
+       CkPrintf("recover all ends\n"); 
 #endif
 }
 
@@ -1107,7 +1109,6 @@ void CkMemCheckPT::finishUp()
   
   if (CkMyPe() == thisFailedPe)
   {
-       inRestarting=0;
        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
        //CkStartQD(cpCallback);
        cpCallback.send();
index e61bff7a1b5a38c483c47d1142d9742c22369852..5be4b9b27d146a214213226b407edb684bfdb8db 100644 (file)
@@ -93,7 +93,7 @@ public:
   void pupAllElements(PUP::er &p);
   void startArrayCheckpoint();
   void recvArrayCheckpoint(CkArrayCheckPTMessage *m);
-  void recoverAll(CkArrayCheckPTMessage * msg, CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap);
+  void recoverAll(CkArrayCheckPTMessage * msg, CkVec<CkGroupID> * gmap=NULL, CkVec<CkArrayIndex> * imap=NULL);
 public:
   static CkCallback  cpCallback;