fix a few bugs in neighbor seed balancer to make spreading tasks more quickly
authorYanhua Yanhua <sun51@illinois.edu>
Wed, 28 Jul 2010 17:36:15 +0000 (12:36 -0500)
committerYanhua Yanhua <sun51@illinois.edu>
Wed, 28 Jul 2010 17:36:15 +0000 (12:36 -0500)
src/conv-ldb/cldb.neighbor.c

index 17bda2a1767215d093017b8106bad78c2223b33c..a9fea7035095d8890ffdc309fb6cbae78fa5598f 100644 (file)
 #define TRACE_USEREVENTS        0
 
 #define PERIOD 20                /* default: 30 */
+#define MSGDELAY 10
 #define MAXOVERLOAD 1
 
+#define MAXBEFOREBALANCE 2
+
+#define LOADTHRESH       3
+
+typedef struct loadproc_s{
+    int index;
+    int load;
+}loadproc;
+
 typedef struct CldProcInfo_s {
   double lastCheck;
   int    sent;                 /* flag to disable idle work request */
   int    balanceEvt;           /* user event for balancing */
   int    idleEvt;              /* user event for idle balancing */
   int    idleprocEvt;          /* user event for processing idle req */
+  double lastBalanceTime;
 } *CldProcInfo;
 
 extern char *_lbtopo;                  /* topology name string */
@@ -65,13 +76,11 @@ static void CldStillIdle(void *dummy, double curT)
 
   double now = curT;
   double lt = cldData->lastCheck;
-    
   /* only ask for work every 20ms */
   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
   cldData->lastCheck = now;
 
   myload = CldLoad();
-  /* CmiAssert(myload == 0); */
   if (myload > 0) return;
 
   msg.from_pe = CmiMyPe();
@@ -86,6 +95,10 @@ static void CldStillIdle(void *dummy, double curT)
     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
   }
+  /*
+  msg.to_rank = rand() % CmiNumPes();
+  CmiSyncNodeSend(CmiNodeOf(msg.to_rank),sizeof(requestmsg),(char *)&msg);
+  */
 #endif
   cldData->sent = 1;
 
@@ -101,7 +114,10 @@ static void CldAskLoadHandler(requestmsg *msg)
   int receiver, rank, recvIdx, i;
   int myload = CldLoad();
   double now = CmiWallTimer();
+  CldProcInfo  cldData = CpvAccess(CldData);
 
+  if(now - cldData->lastBalanceTime < MSGDELAY * 0.001)
+      return;
   /* only give you work if I have more than 1 */
   if (myload>0) {
     int sendLoad;
@@ -116,9 +132,9 @@ static void CldAskLoadHandler(requestmsg *msg)
     }
     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
 #endif
-    sendLoad = myload / CpvAccess(numNeighbors) / 2;
+    sendLoad = myload / CpvAccess(numNeighbors);
     if (sendLoad < 1) sendLoad = 1;
-    sendLoad = 1;
+    //sendLoad = 1;
     for (i=0; i<CpvAccess(numNeighbors); i++) 
       if (CpvAccess(neighbors)[i].pe == receiver) break;
     CmiAssert(i<CpvAccess(numNeighbors));
@@ -202,15 +218,35 @@ int CldMinAvg()
     CpvAccess(MinLoad) = CldLoad();
     CpvAccess(MinProc) = CmiMyPe();
   }
-  i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
+  i = sum/(nNeighbors+1);
+  if (sum % (nNeighbors+1) != 0) i++;
+  /*
+  if( ((float)sum /(float)(nNeighbors+1) - sum/(nNeighbors+1)) > 0.000001)
+      i = i+1;
+  */
   return i;
 }
 
+static int comp(const void *p1, const void *p2)
+{
+    loadproc *l1 = (loadproc *)p1;
+    loadproc *l2 = (loadproc *)p2;
+    if ( l1->load < l2->load) return -1;
+    else if (l1->load == l2->load) return 0;
+    else return 1;
+}
+
 void CldBalance(void *dummy, double curT)
 {
-  int i, j, overload, numToMove=0, avgLoad;
+  int i, j, k, overload, numToMove=0, avgLoad;
   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
-
+  int nNeighbors = CpvAccess(numNeighbors);
+  CldProcInfo  cldData = CpvAccess(CldData);
+  loadproc* sorted_byload = (loadproc*)malloc(sizeof(loadproc) * nNeighbors);
+  int myload = CldLoad();
+  int minload;
+  int numberoflessthanme = 0;
+  cldData->lastBalanceTime = CmiWallTimer();
 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
   double startT = curT;
 #endif
@@ -221,36 +257,89 @@ void CldBalance(void *dummy, double curT)
   if (overload > CldCountTokens())
     overload = CldCountTokens();
 
+  int already_sort = 0;
+  int thisload;
+  int sum = 0;
+
+  for (i=0; i<nNeighbors; i++) {
+      thisload = CpvAccess(neighbors)[i].load;
+      if(thisload < myload)
+          numberoflessthanme++;
+       sorted_byload[i].load = thisload;
+       sorted_byload[i].index = i;
+  }
+  qsort(sorted_byload, nNeighbors, sizeof(loadproc), comp);
+  
+  minload = sorted_byload[0].load;
+
   if (overload > MAXOVERLOAD) {
     int nNeighbors = CpvAccess(numNeighbors);
     for (i=0; i<nNeighbors; i++)
-      if (CpvAccess(neighbors)[i].load < avgLoad) {
-        totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
-        if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
-          maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
+      if (sorted_byload[i].load < avgLoad) {
+        totalUnderAvg += avgLoad-sorted_byload[i].load;
+        if (avgLoad - sorted_byload[i].load > maxUnderAvg)
+          maxUnderAvg = avgLoad - sorted_byload[i].load;
         numUnderAvg++;
       }
     if (numUnderAvg > 0)
-      for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
-       j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
-        if (CpvAccess(neighbors)[j].load < avgLoad) {
-          numToMove = (avgLoad - CpvAccess(neighbors)[j].load)/numUnderAvg;
-          if (numToMove > overload)
-            numToMove = overload;
-          overload -= numToMove;
-         CpvAccess(neighbors)[j].load += numToMove;
+        for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
+            j = (sorted_byload[i].index % nNeighbors);
+            if (CpvAccess(neighbors)[j].load < avgLoad) {
+                numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
+                if (numToMove > overload)
+                    numToMove = overload;
+                overload -= numToMove;
+                CpvAccess(neighbors)[j].load += numToMove;
+#if CMK_MULTICORE
+                CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
+#else
+                CldMultipleSend(CpvAccess(neighbors)[j].pe, 
+                    numToMove, CmiMyRank(), 
+#if CMK_SMP
+                    0
+#else
+                    1
+#endif
+                    );
+#endif
+            }
+        }
+  }else if(myload > LOADTHRESH)
+  {
+      sum = myload;
+      avgLoad = 0;
+      
+      for (i=0; i<numberoflessthanme; i++){
+          if(sorted_byload[i].load >= avgLoad)
+              break;
+          sum += sorted_byload[i].load;      
+          avgLoad = sum/(i+1);
+      }
+      avgLoad = sum/(i+1);
+      overload = myload - avgLoad;
+      for (k=0; k<i && overload > 0; k++)
+      {
+        j = (sorted_byload[k].index % nNeighbors);
+        if(CpvAccess(neighbors)[j].load < avgLoad)
+        {
+            numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
+            if (numToMove > overload)
+                numToMove = overload;
+            overload -= numToMove;
+            CpvAccess(neighbors)[j].load += numToMove;
 #if CMK_MULTICORE
-          CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
+            CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
 #else
-          CldMultipleSend(CpvAccess(neighbors)[j].pe, 
-                         numToMove, CmiMyRank(), 
+            CldMultipleSend(CpvAccess(neighbors)[j].pe, 
+                numToMove, CmiMyRank(), 
 #if CMK_SMP
-                         0
+                0
 #else
-                         1
+                1
 #endif
-                          );
+                );
 #endif
+
         }
       }
   }
@@ -258,10 +347,17 @@ void CldBalance(void *dummy, double curT)
 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
 #endif
-  CcdCallFnAfterOnPE((CcdVoidFn)CldBalance, NULL, PERIOD, CmiMyPe());
 
+  free(sorted_byload);
+}
+
+void CldBalancePeriod(void *dummy, double curT)
+{
+   CldBalance(NULL, curT);
+   CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, PERIOD, CmiMyPe());
 }
 
+
 void CldLoadResponseHandler(loadmsg *msg)
 {
   int i;
@@ -278,6 +374,9 @@ void CldBalanceHandler(void *msg)
 {
   CldRestoreHandler(msg);
   CldPutToken(msg);
+
+    if(CldLoad() > MAXBEFOREBALANCE)
+        CldBalance(NULL, CmiWallTimer());
 }
 
 void CldHandler(void *msg)
@@ -335,8 +434,6 @@ void CldEnqueue(int pe, void *msg, int infofn)
   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
   CldPackFn pfn;
 
-  //Yanhua Test
-
   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
     avg = CldMinAvg();
     if (CldLoad() < avg)
@@ -362,6 +459,7 @@ void CldEnqueue(int pe, void *msg, int infofn)
       CmiSetInfo(msg,infofn);
       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
       CmiSyncSendAndFree(pe, len, msg);
+    
     }
     else {
       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
@@ -580,7 +678,7 @@ void CldGraphModuleInit(char **argv)
 #if CMK_MULTICORE
     CmiNodeBarrier();
 #endif
-    CldBalance(NULL, CmiWallTimer());
+    CldBalancePeriod(NULL, CmiWallTimer());
   }
 
 #if 1