implemented the immediate message scheme of idle local balancing - idle processor...
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 26 Nov 2003 03:25:05 +0000 (03:25 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 26 Nov 2003 03:25:05 +0000 (03:25 +0000)
src/conv-ldb/cldb.c
src/conv-ldb/cldb.h
src/conv-ldb/cldb.neighbor.c
src/conv-ldb/cldb.neighbor.h

index a90a569dd98b00d70f54589ce22cec87fad9c3ef..8b56ead8eb100ffedba6f3c3cc2a4394ada04d3a 100644 (file)
@@ -14,6 +14,9 @@ CpvDeclare(int, CldRelocatedMessages);
 CpvDeclare(int, CldLoadBalanceMessages);
 CpvDeclare(int, CldMessageChunks);
 CpvDeclare(int, CldLoadNotify);
+
+CpvDeclare(CmiNodeLock, cldLock);
+
 extern void LoadNotifyFn(int);
 
 char* _lbtopo = "torus2d";
@@ -127,10 +130,12 @@ void CldPutToken(char *msg)
 {
   CldProcInfo proc = CpvAccess(CldProc);
   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
-  CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
+  CldToken tok;
   int len, queueing, priobits; unsigned int *prioptr;
   CldPackFn pfn;
 
+  CmiLock(CpvAccess(cldLock));
+  tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
   tok->msg = msg;
 
   /* add token to the doubly-linked circle */
@@ -143,23 +148,44 @@ void CldPutToken(char *msg)
   CmiSetHandler(tok, proc->tokenhandleridx);
   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
+  CmiUnlock(CpvAccess(cldLock));
 }
 
-void CldGetToken(char **msg)
+static void * _CldGetTokenMsg(CldProcInfo proc)
 {
-  CldProcInfo proc = CpvAccess(CldProc);
   CldToken tok;
+  void *msg;
   
   tok = proc->sentinel->succ;
   if (tok == proc->sentinel) {
-    *msg = 0; return;
+    return NULL;
   }
   tok->pred->succ = tok->succ;
   tok->succ->pred = tok->pred;
   proc->load --;
-  *msg = tok->msg;
+  msg = tok->msg;
   tok->msg = 0;
-  CpvAccess(CldLoadOffset)++;
+  return msg;
+}
+
+void CldGetToken(char **msg)
+{
+  CldProcInfo proc = CpvAccess(CldProc);
+  CmiLock(CpvAccess(cldLock));
+  *msg = _CldGetTokenMsg(proc);
+  if (*msg) CpvAccess(CldLoadOffset)++;
+  CmiUnlock(CpvAccess(cldLock));
+}
+
+/* called at node level */
+/* get token from processor of rank pe */
+static void CldGetTokenFromRank(char **msg, int rank)
+{
+  CldProcInfo proc = CpvAccessOther(CldProc, rank);
+  CmiLock(CpvAccessOther(cldLock, rank));
+  *msg = _CldGetTokenMsg(proc);
+  if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
+  CmiUnlock(CpvAccessOther(cldLock, rank));
 }
 
 /* Bit Vector Stuff */
@@ -211,8 +237,8 @@ void CldModuleGeneralInit(char **argv)
 
   CpvInitialize(CldProcInfo, CldProc);
   CpvInitialize(int, CldLoadOffset);
-  CpvInitialize(int, CldLoadNotify);
   CpvAccess(CldLoadOffset) = 0;
+  CpvInitialize(int, CldLoadNotify);
   CpvInitialize(BitVector, CldPEBitVector);
   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
   for (i=0; i<CmiNumPes(); i++)
@@ -225,10 +251,15 @@ void CldModuleGeneralInit(char **argv)
   sentinel->succ = sentinel;
   sentinel->pred = sentinel;
 
+  /* lock to protect token queue for immediate message and smp */
+  CpvInitialize(CmiNodeLock, cldLock);
+  CpvAccess(cldLock) = CmiCreateLock();
+
+  /* register load balancing virtual topologies */
   registerLBTopos();
 }
 
-void CldMultipleSend(int pe, int numToSend)
+void CldMultipleSend(int pe, int numToSend, int rank)
 {
   char **msgs;
   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
@@ -245,7 +276,7 @@ void CldMultipleSend(int pe, int numToSend)
     numSent = 0;
     parcelSize = 0;
     for (i=0; i<numToSend; i++) {
-      CldGetToken(&msgs[i]);
+      CldGetTokenFromRank(&msgs[i], rank);
       if (msgs[i] != 0) {
        done = 1;
        numSent++;
@@ -253,7 +284,7 @@ void CldMultipleSend(int pe, int numToSend)
        ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
        msgSizes[i] = len;
        parcelSize += len;
-       CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
+       CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
       }
       else {
        done = 1;
@@ -270,14 +301,14 @@ void CldMultipleSend(int pe, int numToSend)
       CmiMultipleSend(pe, numSent, msgSizes, msgs);
       for (i=0; i<numSent; i++)
        CmiFree(msgs[i]);
-      CpvAccess(CldRelocatedMessages) += numSent;
-      CpvAccess(CldMessageChunks)++;
+      CpvAccessOther(CldRelocatedMessages, rank) += numSent;
+      CpvAccessOther(CldMessageChunks, rank)++;
     }
     else if (numSent == 1) {
       CmiSyncSend(pe, msgSizes[0], msgs[0]);
       CmiFree(msgs[0]);
-      CpvAccess(CldRelocatedMessages)++;
-      CpvAccess(CldMessageChunks)++;
+      CpvAccessOther(CldRelocatedMessages, rank)++;
+      CpvAccessOther(CldMessageChunks, rank)++;
     }
   }
   free(msgs);
index f6fbd58a4e7f4415fdbcb28b14bbc50a42faaf5a..5b08c95676d6ed2a926eaa482d0eaa8b228dca8f 100644 (file)
@@ -11,7 +11,9 @@ CpvExtern(int, CldLoadBalanceMessages);
 CpvExtern(int, CldMessageChunks);
 CpvExtern(int, CldLoadNotify);
 
-void CldMultipleSend(int pe, int numToSend);
+CpvExtern(CmiNodeLock, cldLock);
+
+void CldMultipleSend(int pe, int numToSend, int rank);
 void CldSetPEBitVector(const char *);
 
 int  CldLoad(void);
index 032110f84b218414540da86e6e723096a46931da..e415f1e52636a1454005e2863f826dd6296bda4b 100644 (file)
@@ -1,17 +1,23 @@
 #include <stdlib.h>
-#include "cldb.neighbor.h"
-#define PERIOD 20                /* default: 30 */
-#define MAXOVERLOAD 1
 
 #include "converse.h"
+#include "cldb.neighbor.h"
 #include "queueing.h"
 #include "cldb.h"
 #include "topology.h"
 
+#define IDLE_IMMEDIATE                 1
+#define TRACE_USEREVENTS        0
+
+#define PERIOD 20                /* default: 30 */
+#define MAXOVERLOAD 1
+
 typedef struct CldProcInfo_s {
-  double lastIdle;
+  double lastCheck;
+  int    sent;                 /* flag to disable idle work request */
   int    balanceEvt;           /* user event for balancing */
   int    idleEvt;              /* user event for idle balancing */
+  int    idleprocEvt;          /* user event for processing idle req */
 } *CldProcInfo;
 
 extern char *_lbtopo;                  /* topology name string */
@@ -27,6 +33,8 @@ CpvDeclare(int, Mindex);
 
 void LoadNotifyFn(int l)
 {
+  CldProcInfo  cldData = CpvAccess(CldData);
+  cldData->sent = 0;
 }
 
 char *CldGetStrategy(void)
@@ -37,58 +45,87 @@ char *CldGetStrategy(void)
 /* since I am idle, ask for work from neighbors */
 static void CldBeginIdle(void *dummy)
 {
-  CpvAccess(CldData)->lastIdle = CmiWallTimer();
+  CpvAccess(CldData)->lastCheck = CmiWallTimer();
 }
 
 static void CldEndIdle(void *dummy)
 {
-  CpvAccess(CldData)->lastIdle = -1;
+  CpvAccess(CldData)->lastCheck = -1;
 }
 
 static void CldStillIdle(void *dummy)
 {
+  int i;
   double startT;
-  loadmsg msg;
+  requestmsg msg;
   int myload;
   CldProcInfo  cldData = CpvAccess(CldData);
 
-  double t = CmiWallTimer();
-  double lt = cldData->lastIdle;
-  /* only ask for work every 5ms */
-  if (lt!=-1 && t-lt<0.005) {
-    return;
-  }
-  cldData->lastIdle = t;
-
-#ifndef CMK_OPTIMIZE
-  startT = CmiWallTimer();
-#endif
+  double now = CmiWallTimer();
+  double lt = cldData->lastCheck;
+  /* only ask for work every 20ms */
+  if (cldData->sent && (lt!=-1 && now-lt< 0.020)) return;
+  cldData->lastCheck = now;
 
   myload = CldLoad();
-/*  CmiAssert(myload == 0); */
+  CmiAssert(myload == 0);
   if (myload > 0) return;
 
-  msg.pe = CmiMyPe();
-  msg.load = myload;
+  msg.from_pe = CmiMyPe();
   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
-  CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
+#if ! IDLE_IMMEDIATE
+  msg.to_rank = -1;
+  CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
+#else
+  /* fixme */
+  CmiBecomeImmediate(&msg);
+  for (i=0; i<CpvAccess(numNeighbors); i++) {
+    msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
+    CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
+  }
+#endif
+  cldData->sent = 1;
 
-#ifndef CMK_OPTIMIZE
-  /* traceUserBracketEvent(cldData->idleEvt, startT, CmiWallTimer()); */
+#if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
+  traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
 #endif
 }
 
-static void CldAskLoadHandler(loadmsg *msg)
+/* immediate message handler, work at node level */
+/* send some work to requested proc */
+static void CldAskLoadHandler(requestmsg *msg)
 {
-  /* send some work to this proc */
-  int receiver = msg->pe;
+  int receiver, rank;
   int myload = CldLoad();
+  double now = CmiWallTimer();
 
+  /* only give you work if I have more than 1 */
   if (myload>1) {
-    int sendLoad = myload / CpvAccess(numNeighbors) / 2;
+    int sendLoad;
+    receiver = msg->from_pe;
+    rank = CmiMyRank();
+    if (msg->to_rank != -1) rank = msg->to_rank;
+#if IDLE_IMMEDIATE
+    /* try the lock */
+    if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
+      CmiDelayImmediate();             /* postpone immediate message */
+      return;
+    }
+    CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
+#endif
+    sendLoad = myload / CpvAccess(numNeighbors) / 2;
     if (sendLoad < 1) sendLoad = 1;
     sendLoad = 1;
-    CldMultipleSend(receiver, sendLoad);
+    CldMultipleSend(receiver, sendLoad, rank);
+#if 0
+#if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
+    /* this is dangerous since projections logging is not thread safe */
+    {
+    CldProcInfo  cldData = CpvAccessOther(CldData, rank);
+    traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
+    }
+#endif
+#endif
   }
   CmiFree(msg);
 }
@@ -168,16 +205,15 @@ void CldBalance()
             numToMove = overload;
           overload -= numToMove;
          CpvAccess(neighbors)[j].load += numToMove;
-          CldMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
+          CldMultipleSend(CpvAccess(neighbors)[j].pe, numToMove, CmiMyRank());
         }
       }
   }
   CldSendLoad();
-#ifndef CMK_OPTIMIZE
-/*  traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer()); */
+#if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
+  traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
 #endif
   CcdCallFnAfterOnPE((CcdVoidFn)CldBalance, NULL, PERIOD, CmiMyPe());
-/*  CcdCallBacksReset(0); */
 }
 
 void CldLoadResponseHandler(loadmsg *msg)
@@ -414,10 +450,12 @@ void CldGraphModuleInit(char **argv)
   CpvInitialize(int, CldAskLoadHandlerIndex);
 
   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
-  CpvAccess(CldData)->lastIdle = -1;
+  CpvAccess(CldData)->lastCheck = -1;
+  CpvAccess(CldData)->sent = 0;
 #ifndef CMK_OPTIMIZE
   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
+  CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
 #endif
 
   CpvAccess(MinLoad) = 0;
@@ -461,17 +499,13 @@ void CldGraphModuleInit(char **argv)
     CldBalance();
   }
 
-#if 0
-  /* register an idle handler */
+#if 1
+  /* register idle handlers - when idle, keep asking work from neighbors */
   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
       (CcdVoidFn) CldStillIdle, NULL);
   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
       (CcdVoidFn) CldStillIdle, NULL);
 #endif
-#if 0
-  /* periodic load balancing */
-  CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) CldBalance, NULL);
-#endif
 }
 
 void CldModuleInit(char **argv)
@@ -484,6 +518,8 @@ void CldModuleInit(char **argv)
   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
   CpvAccess(CldMessageChunks) = 0;
 
+  CpvAccess(CldLoadNotify) = 1;
+
   CldModuleGeneralInit(argv);
   CldGraphModuleInit(argv);
 }
index 766ebd5ccf5aa7f73aaac2a47fb3e43f71b7c267..66dd3cea3b9389a4a5af39da2e14e9dbd1ad3dcf 100644 (file)
@@ -19,9 +19,11 @@ typedef struct loadmsg_s {
   int pe, load;
 } loadmsg;
 
+/* work request message when idle */
 typedef struct requestmsg_s {
   char header[CmiMsgHeaderSizeBytes];
-  int pe;
+  int from_pe;
+  int to_rank;
 } requestmsg;
 
 CpvDeclare(CldNeighborData, neighbors);