Added strategy parameters for maximum buffer size and maximum message size
authorLukasz Wesolowski <wesolwsk@illinois.edu>
Sun, 15 Apr 2007 04:09:01 +0000 (04:09 +0000)
committerLukasz Wesolowski <wesolwsk@illinois.edu>
Sun, 15 Apr 2007 04:09:01 +0000 (04:09 +0000)
src/ck-com/PrioStreaming.C
src/ck-com/PrioStreaming.h
src/ck-com/StreamingStrategy.C
src/ck-com/StreamingStrategy.h

index 9a68a83835759e515bdfe683aea43cde86967a05..bd0f0a715961891678afc1de3911ce3d282e181d 100644 (file)
@@ -1,8 +1,9 @@
 #include "PrioStreaming.h"
 #include "MsgPacker.h"
 
-PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio)
-    : StreamingStrategy(periodMs, bufferMax_), basePriority(prio)
+PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio, 
+                            int msgSizeMax_, int bufSizeMax_)
+    : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), basePriority(prio)
 {
 }
 
@@ -23,11 +24,12 @@ void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
 
     streamingMsgBuf[pe].enq(cmsg);
     streamingMsgCount[pe]++;   
+    bufSize[pe]+=cmsg->getSize();
 
     if(msg_prio <= basePriority)
         flushPE(pe);
 
-    if (streamingMsgCount[pe] > bufferMax) 
+    if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax
         flushPE(pe);
 }
 
index 714aeb459a210382c3497215f5bb35c0b13ec85c..a6ee3a97d55004494914b3efb86f1974c96bd216 100644 (file)
@@ -27,7 +27,11 @@ class PrioStreaming : public StreamingStrategy {
      - Current message is a high priority message
     */
 
-    PrioStreaming(int periodMs=10, int bufferMax=1000, int prio=0);
+    PrioStreaming(int periodMs=DEFAULT_TIMEOUT, 
+                 int bufferMax=MAX_NUM_STREAMING_MESSAGES, 
+                 int prio=0,
+                 int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE,
+                 int bufSizeMAX=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
     PrioStreaming(CkMigrateMessage *m) : StreamingStrategy(m) {}
     
     virtual void insertMessage(CharmMessageHolder *msg);
index 142f095627e192974ae9278a7c4c3ceb4bcd3b87..80e2ebeefa3f8ec799cae95df635c29977cf836a 100644 (file)
@@ -19,22 +19,28 @@ void StreamingHandlerFn(void *msg) {
     return;
 }
 
-StreamingStrategy::StreamingStrategy(int periodMs, int bufferMax_)
-    : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
+StreamingStrategy::StreamingStrategy(int periodMs, int bufferMax_, 
+                                    int msgSizeMax_, int bufSizeMax_)
+    : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_), 
+      bufSizeMax(bufSizeMax_), CharmStrategy()
 {
-    streamingMsgBuf=NULL;
-    streamingMsgCount=NULL;
+    streamingMsgBuf = NULL;
+    streamingMsgCount = NULL;
+    bufSize = NULL;
     shortMsgPackingFlag = CmiFalse;
     idleFlush = CmiTrue;
     streaming_handler_id = 0;
     setType(ARRAY_STRATEGY);
 }
 
-StreamingStrategy::StreamingStrategy(double periodMs, int bufferMax_)
-    : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
+StreamingStrategy::StreamingStrategy(double periodMs, int bufferMax_, 
+                                    int msgSizeMax_, int bufSizeMax_)
+    : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_), 
+      bufSizeMax(bufSizeMax_), CharmStrategy()
 {
     streamingMsgBuf = NULL;
     streamingMsgCount = NULL;
+    bufSize = NULL;
     shortMsgPackingFlag = CmiFalse;
     idleFlush = CmiTrue;
     streaming_handler_id = 0;
@@ -48,19 +54,22 @@ void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
     envelope *env = UsrToEnv(msg);
     int size = env->getTotalsize();
 
-    if(size > MAX_STREAMING_MESSAGE_SIZE) {//AVOID COPYING
-        ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
+    if(size > msgSizeMax) {//AVOID COPYING
+        ComlibPrintf("StreamingStrategy::inserSessage: direct send\n");
         CmiSyncSendAndFree(pe, size, (char *)env);
         delete cmsg;
         return;
     }
 
-    ComlibPrintf("StreamingStrategy::insertMessage: buffering t=%g, n=%d, s=%d\n",  
-                 PERIOD, bufferMax, size);
+    ComlibPrintf("[%d] StreamingStrategy::insertMessage: buffering t=%g, n=%d, d=%d, s=%d\n",  
+                 CkMyPe(), PERIOD, bufferMax, pe, size);
     
     streamingMsgBuf[pe].enq(cmsg);
     streamingMsgCount[pe]++;
-    if (streamingMsgCount[pe] > bufferMax) flushPE(pe);
+    bufSize[pe]+=cmsg->getSize();
+    if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax) {
+      flushPE(pe);
+    }
 }
 
 void StreamingStrategy::doneInserting() {
@@ -88,6 +97,7 @@ void StreamingStrategy::flushPE(int pe) {
                    CkMyPe(), streamingMsgCount[pe], pe); 
       CmiSyncSendAndFree(pe, size, (char *)msg);
       streamingMsgCount[pe] = 0;
+      bufSize[pe] = 0;
   }
   else {
       
@@ -103,6 +113,7 @@ void StreamingStrategy::flushPE(int pe) {
                      CkMyPe(), pe);            
         delete cmsg;
         streamingMsgCount[pe] = 0;
+       bufSize[pe] = 0;
         return;
     }
     /*
@@ -183,6 +194,7 @@ void StreamingStrategy::flushPE(int pe) {
     }    
     
     streamingMsgCount[pe] = 0;
+    bufSize[pe] = 0;
     CmiSetHandler(newmsg, streaming_handler_id);
     CmiSyncSendAndFree(pe, sp.size(), newmsg); 
   }
@@ -254,15 +266,20 @@ void StreamingStrategy::pup(PUP::er &p){
   CharmStrategy::pup(p);
   p | PERIOD;
   p | bufferMax;
+  p | msgSizeMax;
   p | shortMsgPackingFlag;
+  p | bufSizeMax;
   p | idleFlush;
   p | streaming_handler_id;
 
   if(p.isUnpacking()) {
       streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
       streamingMsgCount = new int[CkNumPes()];
-      for(int count = 0; count < CkNumPes(); count ++)
-          streamingMsgCount[count] = 0;
+      bufSize = new int[CkNumPes()];
+      for(int count = 0; count < CkNumPes(); count ++) {
+       streamingMsgCount[count] = 0;
+       bufSize[count] = 0;
+      }
   }
 }
 
index 1257d3feda0b1dce4a7b509704fb4c447dfba38b..22597bf60992f9c46e58029edc5860fe3d590f62 100644 (file)
@@ -3,12 +3,17 @@
 #include "ComlibManager.h"
 
 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
+#define MAX_NUM_STREAMING_MESSAGES 1000
+#define DEFAULT_TIMEOUT 10
 
 class StreamingStrategy : public CharmStrategy {
  protected:
     CkQ<CharmMessageHolder *> *streamingMsgBuf;
     int *streamingMsgCount;
+    int *bufSize;
     int bufferMax;
+    int msgSizeMax;
+    int bufSizeMax;
     double PERIOD;
     CmiBool shortMsgPackingFlag, idleFlush;
 
@@ -23,11 +28,19 @@ class StreamingStrategy : public CharmStrategy {
      These are the criteria for flushing all pending messages:
        - it's been at least period (in ms) since the last flush, or
        - the processor just went idle.
-     Thses criteria flush a single PE's pending messages:
-       - more than bufferMax messages to buffered for one PE.
+     These criteria flush a single PE's pending messages:
+       - more than bufferMax messages to buffered for one PE, or
+       - max buffer size reached
+     Messages above the size threshold are sent directly without using the strategy  .
     */
-    StreamingStrategy(int periodMs=1, int bufferMax=1000);
-    StreamingStrategy(double periodMs=1.0, int bufferMax=1000);
+    StreamingStrategy(int periodMs=DEFAULT_TIMEOUT, 
+                     int bufferMax=MAX_NUM_STREAMING_MESSAGES,
+                     int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE, 
+                     int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
+    StreamingStrategy(double periodMs=DEFAULT_TIMEOUT, 
+                     int bufferMax=MAX_NUM_STREAMING_MESSAGES, 
+                     int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE, 
+                     int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
 
     StreamingStrategy(CkMigrateMessage *m) : CharmStrategy(m) {}