c32d950abf7e433f788da0ec907bada29869109c
[charm.git] / src / ck-com / PrioStreaming.C
1 #include "PrioStreaming.h"
2 #include "MsgPacker.h"
3
4 PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio)
5     : StreamingStrategy(periodMs, bufferMax_), basePriority(prio)
6 {
7 }
8
9 void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
10
11     ComlibPrintf("Prio Straming: InsertMessage %d, %d\n",  
12                  PERIOD, bufferMax);
13
14     int pe=cmsg->dest_proc;
15     streamingMsgBuf[pe].enq(cmsg);
16     streamingMsgCount[pe]++;
17     if (streamingMsgCount[pe] > bufferMax) {
18         flushPE(pe);
19         return;
20     }
21     
22     char* msg = cmsg->getCharmMessage();
23     envelope *env = UsrToEnv(msg);
24     int msg_prio = *(int*)env->getPrioPtr();
25     
26     if(msg_prio <= basePriority)
27         flushPE(pe);
28 }
29
30 void PrioStreaming::pup(PUP::er &p){
31
32     StreamingStrategy::pup(p);
33
34     p | basePriority;
35
36 }