Updated cldb with additional functions.
authorTerry L. Wilmarth <wilmarth@uiuc.edu>
Tue, 1 Jun 1999 19:38:20 +0000 (19:38 +0000)
committerTerry L. Wilmarth <wilmarth@uiuc.edu>
Tue, 1 Jun 1999 19:38:20 +0000 (19:38 +0000)
src/conv-ldb/cldb.c

index aeee7a1fa1037e64aee8ae08ec2b5ca51fe1e6c1..483ec07a651e6689d4ed1b44c5305576d971ab96 100644 (file)
@@ -1,31 +1,15 @@
-#include "converse.h"
+#include "cldb.h"
 
+/* Estimator stuff.  Of any use? */
 /*
- * CldSwitchHandler takes a message and a new handler number.  It
- * changes the handler number to the new handler number and move the
- * old to the Xhandler part of the header.  When the message gets
- * handled, the handler should call CldRestoreHandler to put the old
- * handler back.
- *
- * CldPutToken puts a message in the scheduler queue in such a way
- * that it can be retreived from the queue.  Once the message gets
- * handled, it can no longer be retreived.  CldGetToken removes a
- * message that was placed in the scheduler queue in this way.
- * CldCountTokens tells you how many tokens are currently retreivable.
- */
-
-typedef struct {
-  int count;
-  CldEstimator fns[16];
-} CldEstimatorTable;
 CpvStaticDeclare(CldEstimatorTable, _estfns);
-
+*/
 void CldRegisterEstimator(CldEstimator fn)
 {
-  CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;
+  /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
 }
 
+/* 
 int CldEstimate(void)
 {
   CldEstimatorTable *estab = &(CpvAccess(_estfns));
@@ -39,6 +23,9 @@ static int CsdEstimator(void)
 {
   return CsdLength();
 }
+*/
+
+CpvDeclare(int, CldLoadOffset);
 
 int CldRegisterInfoFn(CldInfoFn fn)
 {
@@ -50,6 +37,19 @@ int CldRegisterPackFn(CldPackFn fn)
   return CmiRegisterHandler((CmiHandler)fn);
 }
 
+/* CldSwitchHandler takes a message and a new handler number.  It
+ * changes the handler number to the new handler number and move the
+ * old to the Xhandler part of the header.  When the message gets
+ * handled, the handler should call CldRestoreHandler to put the old
+ * handler back.
+ *
+ * CldPutToken puts a message in the scheduler queue in such a way
+ * that it can be retreived from the queue.  Once the message gets
+ * handled, it can no longer be retreived.  CldGetToken removes a
+ * message that was placed in the scheduler queue in this way.
+ * CldCountTokens tells you how many tokens are currently retreivable.  
+*/
+
 void CldSwitchHandler(char *cmsg, int handler)
 {
   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
@@ -61,11 +61,11 @@ void CldRestoreHandler(char *cmsg)
   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
 }
 
-void Cldhandler(void *);
+void Cldhandler(char *);
  
 typedef struct CldToken_s {
   char msg_header[CmiMsgHeaderSizeBytes];
-  void *msg;  /* if null, message already removed */
+  char *msg;  /* if null, message already removed */
   struct CldToken_s *pred;
   struct CldToken_s *succ;
 } *CldToken;
@@ -88,15 +88,21 @@ static void CldTokenHandler(CldToken tok)
     proc->load --;
     CmiHandleMessage(tok->msg);
   }
+  else 
+    CpvAccess(CldLoadOffset)--;
 }
 
 int CldCountTokens()
 {
-  CldProcInfo proc = CpvAccess(CldProc);
-  return proc->load;
+  return (CpvAccess(CldProc)->load);
 }
 
-void CldPutToken(void *msg)
+int CldLoad()
+{
+  return (CsdLength() - CpvAccess(CldLoadOffset));
+}
+
+void CldPutToken(char *msg)
 {
   CldProcInfo proc = CpvAccess(CldProc);
   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
@@ -121,10 +127,11 @@ void CldPutToken(void *msg)
   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
 }
 
-void CldGetToken(void **msg)
+void CldGetToken(char **msg)
 {
   CldProcInfo proc = CpvAccess(CldProc);
   CldToken tok;
+  
   tok = proc->sentinel->succ;
   if (tok == proc->sentinel) {
     *msg = 0; return;
@@ -134,6 +141,7 @@ void CldGetToken(void **msg)
   proc->load --;
   *msg = tok->msg;
   tok->msg = 0;
+  CpvAccess(CldLoadOffset)++;
 }
 
 void CldModuleGeneralInit()
@@ -142,9 +150,8 @@ void CldModuleGeneralInit()
   CldProcInfo proc;
 
   CpvInitialize(CldProcInfo, CldProc);
-  CpvInitialize(CldEstimatorTable, _estfns);
-  CpvAccess(_estfns).count = 0;
-  CldRegisterEstimator(CsdEstimator);
+  CpvInitialize(int, CldLoadOffset);
+  CpvAccess(CldLoadOffset) = 0;
   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
   proc = CpvAccess(CldProc);
   proc->load = 0;
@@ -153,3 +160,59 @@ void CldModuleGeneralInit()
   sentinel->succ = sentinel;
   sentinel->pred = sentinel;
 }
+
+void CldMultipleSend(int pe, int numToSend)
+{
+  char **msgs;
+  int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
+  unsigned int *prioptr;
+  CldInfoFn ifn;
+  CldPackFn pfn;
+
+  if (numToSend == 0)
+    return;
+  msgs = (char **)calloc(numToSend, sizeof(char *));
+  msgSizes = (int *)calloc(numToSend, sizeof(int));
+
+  while (!done) {
+    numSent = 0;
+    parcelSize = 0;
+    for (i=0; i<numToSend; i++) {
+      CldGetToken(&msgs[i]);
+      if (msgs[i] != 0) {
+       done = 1;
+       numSent++;
+       ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
+       ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
+       msgSizes[i] = len;
+       parcelSize += len;
+       CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
+      }
+      else {
+       done = 1;
+       break;
+      }
+      if (parcelSize > MAXMSGBFRSIZE) {
+       if(i<numToSend-1)
+         done = 0;
+       numToSend -= numSent;
+       break;
+      }
+    }
+    if (numSent > 1) {
+      CmiMultipleSend(pe, numSent, msgSizes, msgs);
+      for (i=0; i<numSent; i++)
+       CmiFree(msgs[i]);
+      CpvAccess(CldRelocatedMessages) += numSent;
+      CpvAccess(CldMessageChunks)++;
+    }
+    else if (numSent == 1) {
+      CmiSyncSend(pe, msgSizes[0], msgs[0]);
+      CmiFree(msgs[0]);
+      CpvAccess(CldRelocatedMessages)++;
+      CpvAccess(CldMessageChunks)++;
+    }
+  }
+  free(msgs);
+  free(msgSizes);
+}