doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-com / PrioStreaming.C
index c32d950abf7e433f788da0ec907bada29869109c..b6411ad86e0ede7f6549b53573e9c1bd3b1a0114 100644 (file)
@@ -1,8 +1,15 @@
+/**
+   @addtogroup ComlibCharmStrategy
+   @{
+   @file 
+*/
+
 #include "PrioStreaming.h"
-#include "MsgPacker.h"
+//#include "MsgPacker.h"
 
-PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio)
-    : StreamingStrategy(periodMs, bufferMax_), basePriority(prio)
+PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio, 
+                            int msgSizeMax_, int bufSizeMax_)
+    : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), CharmStrategy(), basePriority(prio)
 {
 }
 
@@ -12,25 +19,34 @@ void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
                  PERIOD, bufferMax);
 
     int pe=cmsg->dest_proc;
-    streamingMsgBuf[pe].enq(cmsg);
-    streamingMsgCount[pe]++;
-    if (streamingMsgCount[pe] > bufferMax) {
-        flushPE(pe);
-        return;
-    }
-    
     char* msg = cmsg->getCharmMessage();
     envelope *env = UsrToEnv(msg);
     int msg_prio = *(int*)env->getPrioPtr();
-    
+
+    if(streamingMsgCount[pe] == 0) 
+        minPrioVec[pe] = msg_prio;
+    else if(minPrioVec[pe] > msg_prio)
+        minPrioVec[pe] = msg_prio;
+
+    streamingMsgBuf[pe].enq(cmsg);
+    streamingMsgCount[pe]++;   
+    bufSize[pe]+=cmsg->getSize();
+
     if(msg_prio <= basePriority)
         flushPE(pe);
+
+    if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax) 
+        flushPE(pe);
 }
 
 void PrioStreaming::pup(PUP::er &p){
 
     StreamingStrategy::pup(p);
-
+    CharmStrategy::pup(p);
     p | basePriority;
 
+    if(p.isPacking() || p.isUnpacking())
+        minPrioVec.resize(CkNumPes());
 }
+
+/*@}*/