Fixing memory leak in a debugging routine.
[charm.git] / src / ck-com / PrioStreaming.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file 
5 */
6
7 #include "PrioStreaming.h"
8 //#include "MsgPacker.h"
9
10 PrioStreaming::PrioStreaming(int periodMs,int bufferMax_, int prio, 
11                              int msgSizeMax_, int bufSizeMax_)
12     : StreamingStrategy(periodMs, bufferMax_, msgSizeMax_, bufSizeMax_), CharmStrategy(), basePriority(prio)
13 {
14 }
15
16 void PrioStreaming::insertMessage(CharmMessageHolder *cmsg) {
17
18     ComlibPrintf("Prio Straming: InsertMessage %d, %d\n",  
19                  PERIOD, bufferMax);
20
21     int pe=cmsg->dest_proc;
22     char* msg = cmsg->getCharmMessage();
23     envelope *env = UsrToEnv(msg);
24     int msg_prio = *(int*)env->getPrioPtr();
25
26     if(streamingMsgCount[pe] == 0) 
27         minPrioVec[pe] = msg_prio;
28     else if(minPrioVec[pe] > msg_prio)
29         minPrioVec[pe] = msg_prio;
30
31     streamingMsgBuf[pe].enq(cmsg);
32     streamingMsgCount[pe]++;   
33     bufSize[pe]+=cmsg->getSize();
34
35     if(msg_prio <= basePriority)
36         flushPE(pe);
37
38     if (streamingMsgCount[pe] > bufferMax || bufSize[pe] > bufSizeMax) 
39         flushPE(pe);
40 }
41
42 void PrioStreaming::pup(PUP::er &p){
43
44     StreamingStrategy::pup(p);
45     CharmStrategy::pup(p);
46     p | basePriority;
47
48     if(p.isPacking() || p.isUnpacking())
49         minPrioVec.resize(CkNumPes());
50 }
51
52 /*@}*/