new random balancer; uses bit vector for active PEs.
authorTerry L. Wilmarth <wilmarth@uiuc.edu>
Fri, 10 Mar 2000 20:39:31 +0000 (20:39 +0000)
committerTerry L. Wilmarth <wilmarth@uiuc.edu>
Fri, 10 Mar 2000 20:39:31 +0000 (20:39 +0000)
src/conv-ldb/cldb.c
src/conv-ldb/cldb.test.c

index 28d335be1b149f4f5f6f19017461b77672a16f6c..977685163c0259a6c190962fb14c95455acf9a36 100644 (file)
@@ -1,10 +1,3 @@
-/*****************************************************************************
- * $Source$
- * $Author$
- * $Date$
- * $Revision$
- *****************************************************************************/
-
 #ifdef WIN32
 #include <stdlib.h>
 #include "queueing.h"
@@ -14,8 +7,10 @@ extern unsigned int CqsLength(Queue);
 #endif
 
 #include "cldb.h"
+#include <math.h>
 
 CpvDeclare(int, CldHandlerIndex);
+CpvDeclare(int, CldPEBitVector);
 CpvDeclare(int, CldBalanceHandlerIndex);
 
 CpvDeclare(int, CldRelocatedMessages);
@@ -51,6 +46,7 @@ static int CsdEstimator(void)
 
 CpvDeclare(int, CldLoadOffset);
 
+
 int CldRegisterInfoFn(CldInfoFn fn)
 {
   return CmiRegisterHandler((CmiHandler)fn);
@@ -146,7 +142,6 @@ void CldPutToken(char *msg)
   /* add token to the scheduler */
   CmiSetHandler(tok, proc->tokenhandleridx);
   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
-  queueing = CQS_QUEUEING_LIFO; 
   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
 }
 
@@ -167,6 +162,43 @@ void CldGetToken(char **msg)
   CpvAccess(CldLoadOffset)++;
 }
 
+/* Bit Vector Stuff */
+
+int CldPresentPE(int pe)
+{
+  int shift = CpvAccess(CldPEBitVector) >> pe;
+  return (shift % 2);
+}
+
+void CldMoveAllSeedsAway()
+{
+  char *msg;
+  int len, queueing, priobits, pe;
+  unsigned int *prioptr;
+  CldInfoFn ifn;  CldPackFn pfn;
+
+  CldGetToken(&msg);
+  while (msg != 0) {
+    ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
+    ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+    CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
+    pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
+    while (!CldPresentPE(pe))
+      pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
+    CmiSyncSendAndFree(pe, len, msg);
+    CldGetToken(&msg);
+  }
+}
+
+void CldSetPEBitVector(int newBV)
+{
+  CpvAccess(CldPEBitVector) = newBV;
+  if (!CldPresentPE(CmiMyPe()))
+    CldMoveAllSeedsAway();
+}
+
+/* End Bit Vector Stuff */
+
 void CldModuleGeneralInit()
 {
   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
@@ -174,6 +206,8 @@ void CldModuleGeneralInit()
 
   CpvInitialize(CldProcInfo, CldProc);
   CpvInitialize(int, CldLoadOffset);
+  CpvInitialize(int, CldPEBitVector);
+  CpvAccess(CldPEBitVector) = (int)(pow(2.0, (double)CmiNumPes())) - 1;
   CpvAccess(CldLoadOffset) = 0;
   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
   proc = CpvAccess(CldProc);
@@ -239,3 +273,4 @@ void CldMultipleSend(int pe, int numToSend)
   free(msgs);
   free(msgSizes);
 }
+
index ed7ea4c7d42d2ae885c9a609bc69df8436eb9c2b..c12d99b0b6d72ed9bf877303e190ce6503372416 100644 (file)
@@ -1,26 +1,13 @@
-/*****************************************************************************
- * $Source$
- * $Author$
- * $Date$
- * $Revision$
- *****************************************************************************/
-
-#include <stdio.h>
-
 #ifdef WIN32
 #include "queueing.h"
-extern void CldGetToken(char **);
-extern void CldSwitchHandler(char *, int);
-extern int CldCountTokens();
 extern void CldRestoreHandler(char *);
-extern void CldPutToken(char *);
 extern void CqsEnqueueGeneral(Queue, void *, unsigned int, unsigned int, unsigned int*);
+extern void CldSwitchHandler(char *, int);
 extern void CldModuleGeneralInit();
 #endif
 
 #include "converse.h"
-#define PERIOD 100
-#define MAXMSGBFRSIZE 100000
+#include "cldb.h"
 
 void LoadNotifyFn(int l)
 {
@@ -28,85 +15,7 @@ void LoadNotifyFn(int l)
 
 char *CldGetStrategy(void)
 {
-  return "test";
-}
-
-CpvDeclare(int, CldHandlerIndex);
-CpvDeclare(int, CldBalanceHandlerIndex);
-CpvDeclare(int, CldRelocatedMessages);
-CpvDeclare(int, CldLoadBalanceMessages);
-CpvDeclare(int, CldMessageChunks);
-
-void CldMultipleSend(int pe, int numToSend)
-{
-  void **msgs;
-  int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
-  unsigned int *prioptr;
-  CldInfoFn ifn;
-  CldPackFn pfn;
-
-  if (numToSend == 0)
-    return;
-
-  msgs = (void **)calloc(numToSend, sizeof(void *));
-  msgSizes = (int *)calloc(numToSend, sizeof(int));
-
-  while (!done) {
-    numSent = 0;
-    parcelSize = 0;
-    for (i=0; i<numToSend; i++) {
-      CldGetToken(&msgs[i]);
-      if (msgs[i] != 0) {
-        done = 1;
-        numSent++;
-        ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
-        ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
-        msgSizes[i] = len;
-        parcelSize += len;
-        CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
-      }
-      else {
-        done = 1;
-        break;
-      }
-      if (parcelSize > MAXMSGBFRSIZE) {
-        if (i<numToSend-1)
-          done = 0;
-        numToSend -= numSent;
-        break;
-      }
-    }
-    if (numSent > 1) {
-      CmiMultipleSend(pe, numSent, msgSizes, (char**) msgs);
-      for (i=0; i<numSent; i++)
-        CmiFree(msgs[i]);
-      CpvAccess(CldRelocatedMessages) += numSent;
-      CpvAccess(CldMessageChunks)++;
-    }
-    else if (numSent == 1) {
-      CmiSyncSend(pe, msgSizes[0], msgs[0]);
-      CmiFree(msgs[0]);
-      CpvAccess(CldRelocatedMessages)++;
-      CpvAccess(CldMessageChunks)++;
-    }
-  }
-  free(msgs);
-  free(msgSizes);
-}
-
-void CldDistributeTokens()
-{
-  int destPe = (CmiMyPe()+1)%CmiNumPes();
-  int numToSend;
-
-  numToSend = CldEstimate() / 2;
-  if (numToSend > CldCountTokens())
-    numToSend = CldCountTokens() / 2;
-
-  if (numToSend > 0)
-    CldMultipleSend(destPe, numToSend);
-
-  CcdCallFnAfter((CcdVoidFn)CldDistributeTokens, NULL, PERIOD);
+  return "rand";
 }
 
 void CldBalanceHandler(void *msg)
@@ -116,11 +25,12 @@ void CldBalanceHandler(void *msg)
   CldPutToken(msg);
 }
 
-void CldHandler(void *msg)
+void CldHandler(char *msg)
 {
+  int len, queueing, priobits;
+  unsigned int *prioptr; 
   CldInfoFn ifn; CldPackFn pfn;
-  int len, queueing, priobits; unsigned int *prioptr;
-  
+
   CmiGrabBuffer((void **)&msg);
   CldRestoreHandler(msg);
   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
@@ -134,15 +44,32 @@ void CldEnqueue(int pe, void *msg, int infofn)
   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
   CldPackFn pfn;
 
-  if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
-    ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
-    CmiSetInfo(msg,infofn);
-    CldPutToken(msg); 
-  } 
+  if (pe == CLD_ANYWHERE) {
+    pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
+    while (!CldPresentPE(pe))
+      pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
+    if (pe != CmiMyPe())
+      CpvAccess(CldRelocatedMessages)++;
+    if (pe == CmiMyPe()) {
+      ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+      CmiSetInfo(msg,infofn);
+      CldPutToken(msg);
+    } 
+    else {
+      ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+      if (pfn) {
+       pfn(&msg);
+       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+      }
+      CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
+      CmiSetInfo(msg,infofn);
+      CmiSyncSendAndFree(pe, len, msg);
+    }
+  }
   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
     CmiSetInfo(msg,infofn);
-    CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
+    CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
   }
   else {
     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
@@ -152,34 +79,51 @@ void CldEnqueue(int pe, void *msg, int infofn)
     }
     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
     CmiSetInfo(msg,infofn);
-    if (pe==CLD_BROADCAST) 
-      CmiSyncBroadcastAndFree(len, msg);
-    else if (pe==CLD_BROADCAST_ALL)
-      CmiSyncBroadcastAllAndFree(len, msg);
+    if (pe==CLD_BROADCAST) { CmiSyncBroadcastAndFree(len, msg); }
+    else if (pe==CLD_BROADCAST_ALL) { CmiSyncBroadcastAllAndFree(len, msg); }
     else CmiSyncSendAndFree(pe, len, msg);
   }
 }
 
-void CldHelpModuleInit()
+void CldNodeEnqueue(int node, void *msg, int infofn)
 {
-  CpvInitialize(int, CldBalanceHandlerIndex);
-
-  CpvAccess(CldBalanceHandlerIndex) = 
-    CmiRegisterHandler(CldBalanceHandler);
-
-  if (CmiNumPes() > 1)
-    CldDistributeTokens();
+  int len, queueing, priobits; unsigned int *prioptr;
+  CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
+  CldPackFn pfn;
+  if (node == CLD_ANYWHERE) {
+    /* node = (((rand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes()); */
+    node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
+    if (node != CmiMyNode())
+      CpvAccess(CldRelocatedMessages)++;
+  }
+  if (node == CmiMyNode()) {
+    ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+    CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
+  } 
+  else {
+    ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+    if (pfn) {
+      pfn(&msg);
+      ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
+    }
+    CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
+    CmiSetInfo(msg,infofn);
+    if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
+    else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
+    else CmiSyncNodeSendAndFree(node, len, msg);
+  }
 }
 
 void CldModuleInit()
 {
   CpvInitialize(int, CldHandlerIndex);
+  CpvInitialize(int, CldBalanceHandlerIndex);
+  CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
+  CpvAccess(CldBalanceHandlerIndex) = CmiRegisterHandler(CldBalanceHandler);
   CpvInitialize(int, CldRelocatedMessages);
   CpvInitialize(int, CldLoadBalanceMessages);
   CpvInitialize(int, CldMessageChunks);
-  CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
     CpvAccess(CldMessageChunks) = 0;
   CldModuleGeneralInit();
-  CldHelpModuleInit();
 }