The new version of comlib! This version passed "make test" in charm/tests on order...
[charm.git] / src / conv-com / StreamingStrategy.h
1 /**
2    @addtogroup ComlibConverseStrategy
3    @{
4    @file 
5 */
6
7 #ifndef STREAMING_STRATEGY
8 #define STREAMING_STRATEGY
9 //#include "convcomlibmanager.h"
10 #include "convcomlibstrategy.h"
11
12 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
13 #define MAX_NUM_STREAMING_MESSAGES 1000
14 #define DEFAULT_TIMEOUT 10
15
16 CkpvExtern(int, streaming_handler_id);
17 extern void StreamingHandlerFn(void *msg);
18
19 /**
20  * The header prepended to combine messages by StreamingStrategy and derived
21  * classes.
22  */
23 struct StreamingMessage {
24   char header[CmiReservedHeaderSize];
25   CmiUInt4 srcPE;
26   CmiUInt4 nmsgs;
27 };
28
29 PUPbytes(StreamingMessage);
30
31 /**
32  * Strategy that buffers small messages and combines them to send few bigger
33  * messages, and therefore gain on sending overhead.
34
35  These are the criteria for flushing all pending messages:
36  <ul>
37  <li> it's been at least period (in ms) since the last flush, or
38  <li> the processor just went idle.
39  </ul>
40
41  These criteria flush a single PE's pending messages:
42  <ul>
43  <li> more than bufferMax messages buffered for one PE, or.
44  <li> total size of buffered messages > bufSizeMax
45  </ul>
46  */
47 class StreamingStrategy : public Strategy {
48  protected:
49     CkQ<MessageHolder *> *streamingMsgBuf;
50     int *streamingMsgCount;
51     int *bufSize;
52     int bufferMax;
53     int msgSizeMax;
54     int bufSizeMax;
55     double PERIOD;
56     //CmiBool shortMsgPackingFlag;
57     CmiBool idleFlush;
58
59     //int streaming_handler_id; //Getting rid of multiple send
60
61     /// Flush all messages destined for this processor:
62     void flushPE(int destPE);
63     
64  public:
65     /**
66      Create a streaming strategy, suitable for passing to Comlib.
67      These are the criteria for flushing all pending messages:
68        - it's been at least period (in ms) since the last flush, or
69        - the processor just went idle.
70      These criteria flush a single PE's pending messages:
71        - more than bufferMax messages to buffered for one PE, or
72        - max buffer size reached
73      Messages above the size threshold are sent directly without using the strategy  .
74     */
75     StreamingStrategy(int periodMs=DEFAULT_TIMEOUT, 
76                       int bufferMax=MAX_NUM_STREAMING_MESSAGES,
77                       int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE, 
78                       int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
79     StreamingStrategy(double periodMs=DEFAULT_TIMEOUT, 
80                       int bufferMax=MAX_NUM_STREAMING_MESSAGES, 
81                       int msgSizeMax=MAX_STREAMING_MESSAGE_SIZE, 
82                       int bufSizeMax=MAX_STREAMING_MESSAGE_SIZE*MAX_NUM_STREAMING_MESSAGES);
83
84     StreamingStrategy(CkMigrateMessage *m) : Strategy(m) {}
85     
86     virtual void insertMessage(MessageHolder *msg);
87     virtual void doneInserting();
88     
89     virtual void handleMessage(void *msg) {
90       CmiAbort("[%d] StreamingStrategy::handleMessage should never be called\n");
91     }
92
93     //virtual void beginProcessing(int ignored);
94
95     virtual void pup(PUP::er &p);
96     //virtual void enableShortArrayMessagePacking()
97     //    {shortMsgPackingFlag=CmiTrue;} //Should be used only for array
98                                        //messages
99
100     virtual void disableIdleFlush() { idleFlush = CmiFalse;}
101
102     /// Register self to be flushed again after a delay.
103     void registerFlush(void);
104     /// Flush all pending messages:
105     void periodicFlush();
106
107     PUPable_decl(StreamingStrategy);
108 };
109 #endif
110
111 /*@}*/