doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-com / PrioStreaming.C
index 9a68a83835759e515bdfe683aea43cde86967a05..b6411ad86e0ede7f6549b53573e9c1bd3b1a0114 100644 (file)
@@ -1,8 +1,15 @@
+/**
+   @addtogroup ComlibCharmStrategy
+   @{
+   @file 
+*/
+
 #include "PrioStreaming.h"
-#include "MsgPacker.h"
+//#include "MsgPacker.h"
 
-PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio)
-    : StreamingStrategy(periodMs, bufferMax_), basePriority(prio)
+PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio, 
+                            int msgSizeMax_, int bufSizeMax_)
+    : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), CharmStrategy(), basePriority(prio)
 {
 }
 
@@ -23,19 +30,23 @@ void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
 
     streamingMsgBuf[pe].enq(cmsg);
     streamingMsgCount[pe]++;   
+    bufSize[pe]+=cmsg->getSize();
 
     if(msg_prio <= basePriority)
         flushPE(pe);
 
-    if (streamingMsgCount[pe] > bufferMax) 
+    if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax
         flushPE(pe);
 }
 
 void PrioStreaming::pup(PUP::er &p){
 
     StreamingStrategy::pup(p);
+    CharmStrategy::pup(p);
     p | basePriority;
 
-    if(p.isUnpacking())
+    if(p.isPacking() || p.isUnpacking())
         minPrioVec.resize(CkNumPes());
 }
+
+/*@}*/