more optimization
authorXiang Ni <xini@login5.intrepid.alcf.anl.gov>
Sun, 21 Oct 2012 19:01:28 +0000 (19:01 +0000)
committerXiang Ni <xini@login5.intrepid.alcf.anl.gov>
Sun, 21 Oct 2012 19:01:28 +0000 (19:01 +0000)
src/ck-core/cklocation.C
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h
src/ck-ldb/GreedyLB.C

index bb77ad6b882cd2b525f86f1642cf8cd50cb3fc78..c5e2dd44aa2765dde2dd78cfb87b5d77c7a64b03 100644 (file)
@@ -1873,9 +1873,9 @@ void CkLocMgr::flushAllRecs(void)
       //the meta data in the location manager are not deleted so we need
       //this condition
       if(_BgOutOfCoreFlag!=1){
-        hash.remove(*(CkArrayIndex *)&idx);
-        delete rec;
-        it->seek(-1);//retry this hash slot
+      //  hash.remove(*(CkArrayIndex *)&idx);
+       // delete rec;
+       // it->seek(-1);//retry this hash slot
       }
     }
     else {
@@ -2208,6 +2208,9 @@ CmiBool CkLocMgr::addElementToRec(CkLocRec_local *rec,ManagerRec *m,
 }
 void CkLocMgr::updateLocation(const CkArrayIndex &idx,int nowOnPe) {
        inform(idx,nowOnPe);
+       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+       if(CkInRestarting())
+       checkptMgr[nowOnPe].gotReply();
 }
 
 /*************************** LocMgr: DELETION *****************************/
index 173f9653c02b5c13b5bd41fc26d79981c73009b6..ac01b5fa545f4882a17d6819f8bedc81d190442c 100644 (file)
@@ -66,6 +66,7 @@ void noopck(const char*, ...)
 #endif
 
 #define CMK_CHKP_ALL           1
+#define CMK_USE_BARRIER                1
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 
@@ -969,7 +970,7 @@ void CkMemCheckPT::recoverArrayElements()
   if (CkMyPe() == thisFailedPe)
   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
   startTime = curTime;
-
+ int flag = 0;
   // recover all array elements
   int count = 0;
 #if STREAMING_INFORMHOME
@@ -1012,11 +1013,16 @@ void CkMemCheckPT::recoverArrayElements()
        recoverAll(msg,gmap,imap);
     CkFreeMsg(msg);
 #endif
+  curTime = CmiWallTimer();
+  if (CkMyPe() == thisFailedPe)
+       CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
 #if STREAMING_INFORMHOME
   for (int i=0; i<CkNumPes(); i++) {
-    if (gmap[i].size() && i!=CkMyPe()) {
+    if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
-    }
+       CkPrintf("[%d] send to %d\n",CkMyPe(),i);
+       flag++; 
+       }
   }
   delete [] imap;
   delete [] gmap;
@@ -1025,12 +1031,20 @@ void CkMemCheckPT::recoverArrayElements()
 
   CKLOCMGR_LOOP(mgr->doneInserting(););
 
-  inRestarting = 0;
   // _crashedNode = -1;
   CpvAccess(_crashedNode) = -1;
+if(CkMyPe()!=thisFailedPe)
+  inRestarting = 0;
+ // if (CkMyPe() == 0)
+   // CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+if(flag == 0)
+{
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
+}
+}
 
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+void CkMemCheckPT::gotReply(){
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
 }
 
 void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
@@ -1087,6 +1101,7 @@ void CkMemCheckPT::finishUp()
   
   if (CkMyPe() == thisFailedPe)
   {
+       inRestarting=0;
        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
        //CkStartQD(cpCallback);
        cpCallback.send();
@@ -1175,8 +1190,19 @@ static int notifyHandlerIdx;
 // called on crashed PE
 static void restartBeginHandler(char *msg)
 {
-#if CMK_MEM_CHECKPOINT
   CmiFree(msg);
+#if CMK_MEM_CHECKPOINT
+#if CMK_USE_BARRIER
+       if(CkMyPe()!=_diePE){
+               printf("restar begin on %d\n",CkMyPe());
+               char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+               CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+               CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else{
+       CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
+       CkRestartCheckPointCallback(NULL, NULL);
+       }
+#else
   static int count = 0;
   CmiAssert(CkMyPe() == _diePE);
   count ++;
@@ -1185,6 +1211,7 @@ static void restartBeginHandler(char *msg)
     count = 0;
   }
 #endif
+#endif
 }
 
 extern void _discard_charm_message();
@@ -1222,8 +1249,14 @@ static void restartBcastHandler(char *msg)
     // reduction
   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+#if CMK_USE_BARRIER
+       //CmiPrintf("before reduce\n"); 
+       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+       //CmiPrintf("after reduce\n");  
+#else
   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
-  checkpointed = 0;
+#endif 
+ checkpointed = 0;
 #endif
 }
 
@@ -1290,7 +1323,7 @@ static void askProcDataHandler(char *msg)
 // called on PE 0
 void qd_callback(void *m)
 {
-   CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
+   CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
    CkFreeMsg(m);
 #ifdef CMK_SMP
    for(int i=0;i<CmiMyNodeSize();i++){
@@ -1479,7 +1512,7 @@ void pingCheckHandler()
 {
 #if CMK_MEM_CHECKPOINT
   double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 4 && (!CkInLdb()||buddy!=0)) {
+  if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
     int i, pe, buddy;
     // tell everyone the buddy dies
     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
index 742c89a5fc54841f9999d9b83b6e29f86064ca22..a0f062255789a8c61909649602b08bf2c934cb31 100644 (file)
@@ -31,7 +31,8 @@ module CkMemCheckpoint {
        entry [reductiontarget] void recoverBuddies();
        entry void recoverEntry(CkArrayCheckPTMessage *msg);
        entry [reductiontarget] void recoverArrayElements();
-       entry void finishUp();
+       entry [reductiontarget] void finishUp();
+       entry void gotReply();
        entry void quiescence(CkCallback&);
         entry void inmem_restore(CkArrayCheckPTMessage *m);
        entry void updateLocations(int n, CkGroupID g[n], CkArrayIndex idx[n],int nowOnPe);
index 901634ef06765f91e1d97aebfe5396f69a216bec..e61bff7a1b5a38c483c47d1142d9742c22369852 100644 (file)
@@ -85,6 +85,7 @@ public:
   void quiescence(CkCallback &);
   void resetReductionMgr();
   void finishUp();
+  void gotReply();
   void inmem_restore(CkArrayCheckPTMessage *m);
   void updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe);
   void resetLB(int diepe);
index 06a128541bc2780c313d4d700384e518f72ec667..32ef0d1094e01862da8fb263f1034e54dc6030fc 100644 (file)
@@ -1,3 +1,10 @@
+/*****************************************************************************
+ * $Source$
+ * $Author$
+ * $Date$
+ * $Revision$
+ *****************************************************************************/
+
 /**
  * \addtogroup CkLdb
 */
       and the nonmigratable object is not taken in the objData array
 */
 
-#include <algorithm>
-
 #include "charm++.h"
 
 
-#include "ckgraph.h"
 #include "cklists.h"
 #include "GreedyLB.h"
 
-using namespace std;
-
 CreateLBFunc_Def(GreedyLB, "always assign the heaviest obj onto lightest loaded processor.")
 
 GreedyLB::GreedyLB(const CkLBOptions &opt): CentralLB(opt)
@@ -37,121 +39,202 @@ CmiBool GreedyLB::QueryBalanceNow(int _step)
   return CmiTrue;
 }
 
-class ProcLoadGreater {
-  public:
-    bool operator()(ProcInfo p1, ProcInfo p2) {
-      return (p1.totalLoad() > p2.totalLoad());
-    }
-};
+CmiBool  GreedyLB::Compare(double x, double y, HeapCmp cmp)
+{
+  const int test =  ((cmp == GT) ? (x > y) : (x < y));
 
-class ObjLoadGreater {
-  public:
-    bool operator()(Vertex v1, Vertex v2) {
-      return (v1.getVertexLoad() > v2.getVertexLoad());
-    }
-};
+  if (test) return CmiTrue; 
+  else return CmiFalse;
+}
 
-void GreedyLB::work(LDStats* stats)
+
+void GreedyLB::Heapify(HeapData *heap, int node, int heapSize, HeapCmp cmp)
 {
-  int  obj, objCount, pe;
-  int n_pes = stats->nprocs();
-  int *map = new int[n_pes];
+  int left = 2*node+1;
+  int right = 2*node+2;
+  int xchange;
+
+  //heap[left].load > heap[node].load)
+  if (left <= heapSize &&  Compare(heap[left].load, heap[node].load, cmp))
+    xchange = left;
+  else xchange = node;
+  //heap[right].load > heap[xchange].load) 
+  if (right <= heapSize && Compare(heap[right].load, heap[xchange].load, cmp))
+    xchange = right;
+
+  if (xchange != node) {
+    HeapData obj;
+    obj = heap[node];
+    heap[node] = heap[xchange];
+    heap[xchange] = obj;
+    Heapify(heap, xchange, heapSize, cmp);
+  }    
+}
+
+void GreedyLB::BuildHeap(HeapData *data, int heapSize, HeapCmp cmp)
+{
+       int i;
+       for(i=heapSize/2; i >= 0; i--)
+               Heapify(data, i, heapSize, cmp);
+}
 
-  std::vector<ProcInfo>  procs;
-  for(pe = 0; pe < n_pes; pe++) {
+void GreedyLB::HeapSort(HeapData *data, int heapSize, HeapCmp cmp)
+{
+       int i;
+       HeapData key;
+
+        int origSize = heapSize;
+       BuildHeap(data, heapSize, cmp);
+        for (i=heapSize; i > 0; i--) {
+               key = data[0];
+               data[0] = data[i];
+               data[i] = key;
+               heapSize--;
+               Heapify(data, 0, heapSize, cmp);
+       }
+       // after HeapSort, the data are in reverse order
+        for (i=0; i<(origSize+1)/2; i++) {
+          key = data[i];
+          data[i] = data[origSize-i];
+          data[origSize-i] = key;
+        }
+}
+
+GreedyLB::HeapData* 
+GreedyLB::BuildObjectArray(BaseLB::LDStats* stats, 
+                             int count, int *objCount)
+{
+  HeapData *objData;
+  int obj;
+
+//for (obj = 0; obj < stats[pe].n_objs; obj++)
+//if (stats[pe].objData[obj].migratable == CmiTrue) (*objCount)++; 
+
+  objData  = new HeapData[stats->n_objs];
+  *objCount = 0; 
+  for(obj=0; obj < stats->n_objs; obj++) {
+    LDObjData &oData = stats->objData[obj];
+    int pe = stats->from_proc[obj];
+    if (!oData.migratable) {
+      if (!stats->procs[pe].available) 
+        CmiAbort("GreedyLB cannot handle nonmigratable object on an unavial processor!\n");
+      continue;
+    }
+    objData[*objCount].load = oData.wallTime * stats->procs[pe].pe_speed;
+    objData[*objCount].pe = pe;
+    objData[*objCount].id = obj;
+    (*objCount)++;
+  }
+  
+  HeapSort(objData, *objCount-1, GT);
+/*
+for (int i=0; i<*objCount; i++)
+  CmiPrintf("%f ", objData[i].load);
+CmiPrintf("\n");
+*/
+  return objData;
+}
+
+GreedyLB::HeapData* 
+GreedyLB::BuildCpuArray(BaseLB::LDStats* stats, 
+                          int count, int *peCount)
+{
+  int pe;
+
+  *peCount = 0;
+  for (pe = 0; pe < count; pe++)
+    if (stats->procs[pe].available) (*peCount)++;
+  HeapData *data = new HeapData[*peCount];
+  int *map = new int[count];
+  
+  *peCount = 0;
+  for (pe=0; pe < count; pe++) {
+    CentralLB::ProcStats &peData = stats->procs[pe];
     map[pe] = -1;
-    if (stats->procs[pe].available) {
-      map[pe] = procs.size();
-      procs.push_back(ProcInfo(pe, stats->procs[pe].bg_walltime, 0.0, stats->procs[pe].pe_speed, true));
+    if (peData.available) 
+    {
+       data[*peCount].load = 0.0;
+      data[*peCount].load += peData.bg_walltime;
+      data[*peCount].pe = data[*peCount].id = pe;
+      map[pe] = *peCount;
+      (*peCount)++;
     }
   }
 
   // take non migratbale object load as background load
-  for (obj = 0; obj < stats->n_objs; obj++)
-  {
+  for (int obj = 0; obj < stats->n_objs; obj++) 
+  { 
       LDObjData &oData = stats->objData[obj];
       if (!oData.migratable)  {
         int pe = stats->from_proc[obj];
         pe = map[pe];
-        if (pe==-1)
+        if (pe==-1) 
           CmiAbort("GreedyLB: nonmigratable object on an unavail processor!\n");
-        procs[pe].totalLoad() += oData.wallTime;
+        data[pe].load += oData.wallTime;
       }
   }
-  delete [] map;
 
   // considering cpu speed
-  for (pe = 0; pe<procs.size(); pe++) {
-    procs[pe].totalLoad() +=  procs[pe].overhead();
-    procs[pe].totalLoad() *= procs[pe].pe_speed();
-  }
+  for (pe = 0; pe<*peCount; pe++)
+    data[pe].load *= stats->procs[data[pe].pe].pe_speed;
 
-  // build object array
-  std::vector<Vertex> objs;
+  BuildHeap(data, *peCount-1, LT);     // minHeap
+  delete [] map;
+  return data;
+}
 
-  for(int obj = 0; obj < stats->n_objs; obj++) {
-    LDObjData &oData = stats->objData[obj];
-    int pe = stats->from_proc[obj];
-    if (!oData.migratable) {
-      if (!stats->procs[pe].available) 
-        CmiAbort("GreedyLB cannot handle nonmigratable object on an unavial processor!\n");
-      continue;
-    }
-    double load = oData.wallTime * stats->procs[pe].pe_speed;
-    objs.push_back(Vertex(obj, load, stats->objData[obj].migratable, stats->from_proc[obj]));
-  }
+void GreedyLB::work(LDStats* stats)
+{
+  int  obj, heapSize, objCount;
+  int n_pes = stats->nprocs();
 
-  // max heap of objects
-  sort(objs.begin(), objs.end(), ObjLoadGreater());
-  // min heap of processors
-  make_heap(procs.begin(), procs.end(), ProcLoadGreater());
+  HeapData *cpuData = BuildCpuArray(stats, n_pes, &heapSize);
+  HeapData *objData = BuildObjectArray(stats, n_pes, &objCount);
 
   if (_lb_args.debug()>1) 
     CkPrintf("[%d] In GreedyLB strategy\n",CkMyPe());
-
-
-    // greedy algorithm
+  heapSize--;
   int nmoves = 0;
-  for (obj=0; obj < objs.size(); obj++) {
-    ProcInfo p = procs.front();
-    pop_heap(procs.begin(), procs.end(), ProcLoadGreater());
-    procs.pop_back();
+  for (obj=0; obj < objCount; obj++) {
+    HeapData minCpu;  
+    // Operation of extracting the least loaded processor
+    // from the heap
+    minCpu = cpuData[0];
+    cpuData[0] = cpuData[heapSize];
+    heapSize--;
+    Heapify(cpuData, 0, heapSize, LT);    
 
     // Increment the time of the least loaded processor by the cpuTime of
     // the `heaviest' object
-    p.totalLoad() += objs[obj].getVertexLoad();
+    minCpu.load += objData[obj].load;
 
     //Insert object into migration queue if necessary
-    const int dest = p.getProcId();
-    const int pe   = objs[obj].getCurrentPe();
-    const int id   = objs[obj].getVertexId();
+    const int dest = minCpu.pe;
+    const int pe   = objData[obj].pe;
+    const int id   = objData[obj].id;
     if (dest != pe) {
       stats->to_proc[id] = dest;
       nmoves ++;
       if (_lb_args.debug()>2) 
-        CkPrintf("[%d] Obj %d migrating from %d to %d\n", CkMyPe(),objs[obj].getVertexId(),pe,dest);
+        CkPrintf("[%d] Obj %d migrating from %d to %d\n", CkMyPe(),objData[obj].id,pe,dest);
     }
 
     //Insert the least loaded processor with load updated back into the heap
-    procs.push_back(p);
-    push_heap(procs.begin(), procs.end(), ProcLoadGreater());
+    heapSize++;
+    int location = heapSize;
+    while (location>0 && cpuData[(location-1)/2].load > minCpu.load) {
+      cpuData[location] = cpuData[(location-1)/2];
+      location = (location-1)/2;
+    }
+    cpuData[location] = minCpu;
   }
 
   if (_lb_args.debug()>0) 
     CkPrintf("[%d] %d objects migrating.\n", CkMyPe(), nmoves);
 
-  if (_lb_args.debug()>1)  {
-    CkPrintf("CharmLB> Min obj: %f  Max obj: %f\n", objs[objs.size()-1].getVertexLoad(), objs[0].getVertexLoad());
-    CkPrintf("CharmLB> PE speed:\n");
-    for (pe = 0; pe<procs.size(); pe++)
-      CkPrintf("%f ", procs[pe].pe_speed());
-    CkPrintf("\n");
-    CkPrintf("CharmLB> PE Load:\n");
-    for (pe = 0; pe<procs.size(); pe++)
-      CkPrintf("%f (%f)  ", procs[pe].totalLoad(), procs[pe].overhead());
-    CkPrintf("\n");
-  }
-
+  delete [] objData;
+  delete [] cpuData;
 }
 
 #include "GreedyLB.def.h"