Checking in a new broadcast strategy which broadcasts to arrays along a load-balanced...
[charm.git] / src / ck-com / StreamingStrategy.h
1 #ifndef STREAMING_STRATEGY
2 #define STREAMING_STRATEGY
3 #include "ComlibManager.h"
4
5 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
6
7 class StreamingStrategy : public CharmStrategy {
8  protected:
9     CkQ<CharmMessageHolder *> *streamingMsgBuf;
10     int *streamingMsgCount;
11     int PERIOD, bufferMax;
12     CmiBool shortMsgPackingFlag, idleFlush;
13
14     int streaming_handler_id; //Getting rid of multiple send
15
16     /// Flush all messages destined for this processor:
17     void flushPE(int destPE);
18     
19  public:
20     /**
21      Create a streaming strategy, suitable for passing to ComlibManager.
22      These are the criteria for flushing all pending messages:
23        - it's been at least period (in ms) since the last flush, or
24        - the processor just went idle.
25      Thses criteria flush a single PE's pending messages:
26        - more than bufferMax messages to buffered for one PE.
27     */
28     StreamingStrategy(int periodMs=10,int bufferMax=1000);
29     StreamingStrategy(CkMigrateMessage *m) : CharmStrategy(m) {}
30     
31     virtual void insertMessage(CharmMessageHolder *msg);
32     virtual void doneInserting();
33     
34     virtual void beginProcessing(int ignored);
35
36     virtual void pup(PUP::er &p);
37     virtual void enableShortArrayMessagePacking()
38         {shortMsgPackingFlag=CmiTrue;} //Should be used only for array
39                                        //messages
40
41     virtual void disableIdleFlush() { idleFlush = CmiFalse;}
42
43     /// Register self to be flushed again after a delay.
44     void registerFlush(void);
45     /// Flush all pending messages:
46     void periodicFlush();
47
48     PUPable_decl(StreamingStrategy);
49 };
50 #endif