added +WSPriority to workstealing seedlb, which steal higher priority tasks.
authorGengbin Zheng <gzheng@illinois.edu>
Wed, 11 May 2011 19:00:00 +0000 (14:00 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Wed, 11 May 2011 19:00:00 +0000 (14:00 -0500)
src/conv-ldb/cldb.c
src/conv-ldb/cldb.workstealing.c

index a4427fefbb7472a38f334a78dfa56ae8d35f175c..482e4c72166311bda6dda70531c1548199e32c1e 100644 (file)
@@ -203,13 +203,13 @@ void CldPutTokenPrio(char *msg)
     ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
     ints1 = (priobits1+CINTBITS-1)/CINTBITS;
 
-    if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { ptr=ptr->pred; break;}
+    if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
     ptr = ptr->succ;
   }
 
   /* add token to the doubly-linked circle */
-  tok->pred = ptr->pred;
   tok->succ = ptr;
+  tok->pred = ptr->pred;
   tok->pred->succ = tok;
   tok->succ->pred = tok;
   proc->load ++;
@@ -268,6 +268,39 @@ void CldGetTokenFromRank(char **msg, int rank)
   CmiUnlock(cldlock);
 }
 
+static 
+#if CMK_C_INLINE
+inline 
+#endif
+void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
+{
+  void *msg;
+  
+  if (tok == proc->sentinel) return NULL;
+  tok->pred->succ = tok->succ;
+  tok->succ->pred = tok->pred;
+  proc->load --;
+  msg = tok->msg;
+  tok->msg = 0;
+  return msg;
+}
+
+/* called at node level */
+/* get token from processor of rank pe */
+static 
+#if CMK_C_INLINE
+inline 
+#endif
+void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
+{
+  CldProcInfo proc = CpvAccessOther(CldProc, rank);
+  CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
+  CmiLock(cldlock);
+  *msg = _CldGetTokenMsgAt(proc, tok);
+  if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
+  CmiUnlock(cldlock);
+}
+
 /* Bit Vector Stuff */
 
 int CldPresentPE(int pe)
@@ -404,6 +437,62 @@ void CldMultipleSend(int pe, int numToSend, int rank, int immed)
   free(msgSizes);
 }
 
+/* function can be called in an immediate handler at node level
+   rank specify the rank of processor for the node to represent
+   This function can also send as immeidate messages
+*/
+void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
+{
+  char **msgs;
+  int len, queueing, priobits, *msgSizes, i;
+  unsigned int *prioptr;
+  CldInfoFn ifn;
+  CldPackFn pfn;
+  CldToken tok;
+  CldProcInfo proc = CpvAccess(CldProc);
+  int count = 0;
+
+  if (numToSend ==0) return;
+  msgs = (char **)calloc(numToSend, sizeof(char *));
+  msgSizes = (int *)calloc(numToSend, sizeof(int));
+
+  tok = proc->sentinel->succ;
+  if (tok == proc->sentinel) return;
+  tok = tok->succ;
+  while (tok!=proc->sentinel) {
+    tok = tok->succ;
+    if (tok == proc->sentinel) break;
+    CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
+    if (msgs[i] != 0) {
+       ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
+       ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
+       msgSizes[count] = len;
+       CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
+        if (immed) CmiBecomeImmediate(msgs[count]);
+        count ++;
+    }
+    tok = tok->succ;
+  }
+  if (count > 1) {
+      if (immed)
+        CmiMultipleIsend(pe, count, msgSizes, msgs);
+      else
+        CmiMultipleSend(pe, count, msgSizes, msgs);
+      for (i=0; i<count; i++)
+       CmiFree(msgs[i]);
+      CpvAccessOther(CldRelocatedMessages, rank) += count;
+      CpvAccessOther(CldMessageChunks, rank)++;
+  }
+  else if (count == 1) {
+      if (immed) CmiBecomeImmediate(msgs[0]);
+      CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
+      CpvAccessOther(CldRelocatedMessages, rank)++;
+      CpvAccessOther(CldMessageChunks, rank)++;
+  }
+  free(msgs);
+  free(msgSizes);
+}
+
 /* simple scheme - just send one by one. useful for multicore */
 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
 {
index b611ccd2ff72fef270d2d025f1c4471fa0159e2c..1327dd3dba7bf152210d522bc5e3c088e7553680 100644 (file)
@@ -15,6 +15,7 @@ typedef struct CldProcInfo_s {
 } *CldProcInfo;
 
 static int WS_Threshold = -1;
+static int _steal_prio = 0;
 static int _stealonly1 = 0;
 static int _steal_immediate = 0;
 static int workstealingproactive = 0;
@@ -107,12 +108,16 @@ static void CldAskLoadHandler(requestmsg *msg)
   if (myload>LOADTHRESH) {
       if(_stealonly1) sendLoad = 1;
       else sendLoad = myload/2; 
-      if(sendLoad > 0)
+      if(sendLoad > 0) {
 #if ! CMK_USE_IBVERBS
+        if (_steal_prio)
+          CldMultipleSendPrio(receiver, sendLoad, rank, 0);
+        else
           CldMultipleSend(receiver, sendLoad, rank, 0);
 #else
           CldSimpleMultipleSend(receiver, sendLoad, rank);
 #endif
+      }
       CmiFree(msg);
   }else
   {     /* send ack indicating there is no task */
@@ -177,10 +182,16 @@ void CldHandler(void *msg)
   /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
 }
 
+#define CldPUTTOKEN(msg)  \
+           if (_steal_prio)   \
+             CldPutTokenPrio(msg);   \
+           else            \
+             CldPutToken(msg);
+
 void CldBalanceHandler(void *msg)
 {
   CldRestoreHandler(msg);
-  CldPutToken(msg);
+  CldPUTTOKEN(msg);
   CpvAccess(isStealing) = 0;      /* fixme: this may not be right */
 }
 
@@ -232,7 +243,7 @@ void CldEnqueue(int pe, void *msg, int infofn)
     }
     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
     CmiSetInfo(msg,infofn);
-    CldPutToken(msg);
+    CldPUTTOKEN(msg);
   } 
   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
@@ -318,6 +329,8 @@ void CldGraphModuleInit(char **argv)
 
   _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
 
+  _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
+
   /* register idle handlers - when idle, keep asking work from neighbors */
   if(CmiNumPes() > 1)
     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,