merging with main branch
[charm.git] / src / ck-com / PrioStreaming.h
1 /**
2    @addtogroup ComlibCharmStrategy
3    *@{
4    @file 
5 */
6
7 #ifndef PRIO_STREAMING
8 #define PRIO_STREAMING
9
10 #include "ComlibManager.h"
11 #include "StreamingStrategy.h"
12
13 /**
14  * Class that streams messages the same way as StreamingStrategy, but adding a
15  * bypass for high priority messages to be flushed immediately.
16
17  These are the criteria for flushing all pending messages: 
18  <ul>
19  <li> it's been at least period (in ms) since the last flush, or 
20  <li> the processor just went idle.
21  </ul>
22
23  These criteria flush a single E's pending messages: 
24  <ul>
25  <li> more than bufferMax messages to buffered for one PE.
26  <li>Current message is a high priority message
27  </ul>
28  */
29 class PrioStreaming : public StreamingStrategy, public CharmStrategy {
30  protected:
31     int basePriority;
32     CkVec<int> minPrioVec;
33     
34  public:
35     /**
36      Create a priority based streaming strategy, suitable for passing
37      to ComlibManager.  
38      These are the criteria for flushing all pending messages: 
39
40      - it's been at least period (in ms) since the last flush, or 
41
42      - the processor just went idle.  Thses criteria flush a single 
43      PE's pending messages: 
44
45      - more than bufferMax messages to buffered for one PE.
46
47      - Current message is a high priority message
48     */
49
50     PrioStreaming(int periodMs=DEFAULT_TIMEOUT, 
51                   int bufferMax=MAX_NUM_STREAMING_MESSAGES, 
52                   int prio=0,
53                   int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE,
54                   int bufSizeMAX=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
55     PrioStreaming(CkMigrateMessage *m) : StreamingStrategy(m), CharmStrategy(m) {}
56
57     void insertMessage(MessageHolder *msg) {insertMessage((CharmMessageHolder*)msg);}
58     virtual void insertMessage(CharmMessageHolder *msg);
59
60     //If new priority is greater than current priority, 
61     //then flush all queues which have relatively high priority messages
62     inline void setBasePriority(int p) {
63         if(p > basePriority) {
64             for(int count =0; count < CkNumPes(); count++)
65                 if(minPrioVec[count] <= p)
66                     flushPE(count);
67         }        
68         basePriority = p;
69     }
70
71     virtual void pup(PUP::er &p);
72     PUPable_decl(PrioStreaming);
73 };
74 #endif
75
76 /*@}*/