Added strategy parameters for maximum buffer size and maximum message size
[charm.git] / src / ck-com / PrioStreaming.C
1 #include "PrioStreaming.h"
2 #include "MsgPacker.h"
3
4 PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio, 
5                              int msgSizeMax_, int bufSizeMax_)
6     : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), basePriority(prio)
7 {
8 }
9
10 void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
11
12     ComlibPrintf("Prio Straming: InsertMessage %d, %d\n",  
13                  PERIOD, bufferMax);
14
15     int pe=cmsg->dest_proc;
16     char* msg = cmsg->getCharmMessage();
17     envelope *env = UsrToEnv(msg);
18     int msg_prio = *(int*)env->getPrioPtr();
19
20     if(streamingMsgCount[pe] == 0) 
21         minPrioVec[pe] = msg_prio;
22     else if(minPrioVec[pe] > msg_prio)
23         minPrioVec[pe] = msg_prio;
24
25     streamingMsgBuf[pe].enq(cmsg);
26     streamingMsgCount[pe]++;   
27     bufSize[pe]+=cmsg->getSize();
28
29     if(msg_prio <= basePriority)
30         flushPE(pe);
31
32     if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax) 
33         flushPE(pe);
34 }
35
36 void PrioStreaming::pup(PUP::er &p){
37
38     StreamingStrategy::pup(p);
39     p | basePriority;
40
41     if(p.isUnpacking())
42         minPrioVec.resize(CkNumPes());
43 }