8650457b8e957784f7ea54330d85278ccada7d61
[charm.git] / src / ck-com / StreamingStrategy.h
1 #ifndef STREAMING_STRATEGY
2 #define STREAMING_STRATEGY
3 #include "ComlibManager.h"
4
5 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
6
7
8 class StreamingStrategy : public CharmStrategy {
9  protected:
10     CkQ<CharmMessageHolder *> *streamingMsgBuf;
11     int *streamingMsgCount;
12     int PERIOD, bufferMax;
13     CmiBool shortMsgPackingFlag, idleFlush;
14
15     /// Flush all messages destined for this processor:
16     void flushPE(int destPE);
17     
18  public:
19     /**
20      Create a streaming strategy, suitable for passing to ComlibManager.
21      These are the criteria for flushing all pending messages:
22        - it's been at least period (in ms) since the last flush, or
23        - the processor just went idle.
24      Thses criteria flush a single PE's pending messages:
25        - more than bufferMax messages to buffered for one PE.
26     */
27     StreamingStrategy(int periodMs=10,int bufferMax=1000);
28     StreamingStrategy(CkMigrateMessage *m) : CharmStrategy(m) {}
29     
30     virtual void insertMessage(CharmMessageHolder *msg);
31     virtual void doneInserting();
32     
33     virtual void beginProcessing(int ignored);
34
35     virtual void pup(PUP::er &p);
36     virtual void enableShortArrayMessagePacking()
37         {shortMsgPackingFlag=CmiTrue;} //Should be used only for array
38                                        //messages
39
40     virtual void disableIdleFlush() { idleFlush = CmiFalse;}
41
42     /// Register self to be flushed again after a delay.
43     void registerFlush(void);
44     /// Flush all pending messages:
45     void periodicFlush();
46
47     PUPable_decl(StreamingStrategy);
48 };
49 #endif