The new version of comlib! This version passed "make test" in charm/tests on order...
[charm.git] / src / conv-com / MeshStreamingStrategy.h
1 /**
2    @addtogroup ComlibConverseStrategy
3    @{
4    @file
5    This is MeshStreamingStrategy, a strategy in the Charm++ communications
6    library.  See MeshStreamingStrategy.C for detailed comments.
7
8    @author Greg Koenig
9    @author Moved into converse comlib by Filippo Gioachin 02/2006
10 */
11
12 #ifndef MESH_STREAMING_STRATEGY
13 #define MESH_STREAMING_STRATEGY
14
15 #include <math.h>
16
17 #include "convcomlibmanager.h"
18
19 #define DEFAULT_FLUSH_PERIOD 10        // milliseconds
20 #define DEFAULT_MAX_BUCKET_SIZE 1000   // number of messages
21
22 CkpvExtern(int, streaming_column_handler_id);
23 extern void streaming_column_handler(void *msg);
24
25 /// Passed along with every row message header in the first iteration of
26 /// the MesgStreamingStrategy
27 struct MeshStreamingHeader {
28     char conv_hdr[CmiMsgHeaderSizeBytes];
29     int strategy_id;
30     int num_msgs;
31 };
32
33 PUPbytes(MeshStreamingHeader);
34
35 /**
36  * This is MeshStreamingStrategy, a strategy in the Charm++ communications
37  * library.  In this strategy, processes are organized into a mesh as
38  * depicted in the following diagram:
39  *
40  *  1    2    3    4
41  *
42  *  5    6    7    8
43  *
44  *  9   10   11   12
45  * 
46  * 13   14   15   16
47  *
48  * If, for example, PE 6 sends a message to PE 4 and a message to PE 12,
49  * both messages will be stored in a column bucket on PE 6 for destination
50  * column 3.  After DEFAULT_FLUSH_PERIOD milliseconds elapse or
51  * DEFAULT_MAX_BUCKET_SIZE messages accumulate, all messages in the bucket
52  * for column 3 are flushed by bundling them together and sending them at
53  * once to PE 8.  When they arrive on PE 8 and are delivered to its
54  * column_handler(), PE 8 breaks the messages in the bundle apart and
55  * stores messages destined for individual rows in separate row buckets.
56  * In the case of our example, the message destined for PE 4 would be
57  * stored in the row bucket for row 0 and the message destined for PE 12
58  * would be stored in the row bucket for row 2.  Again, after
59  * DEFAULT_FLUSH_PERIOD milliseconds elapse or DEFAULT_MAX_BUCKET_SIZE
60  * messages accumulate, all messages in the row buckets are flushed by
61  * bundling them together and sending them at once to their respective
62  * destination PEs by calling CmiMultipleSend().
63  *
64  * The advantage of bundling messages together is that to send to N
65  * PEs in a computation, sqrt(N) actual messages need to be sent on the
66  * network.  This trades computational overhead of bundling messages
67  * for communication overhead; if the processors are fast relative to
68  * the network, we win.
69  *
70  * To understand the bundling/unbundling operations, knowledge of Converse
71  * and Charm++ message header formats is required.  I have attempted to
72  * provide documentation within this code to describe what is going on.
73  */
74 class MeshStreamingStrategy : public Strategy
75 {
76   //CmiBool shortMsgPackingFlag;
77  public:
78     MeshStreamingStrategy (int period=DEFAULT_FLUSH_PERIOD,
79                            int bucket_size=DEFAULT_MAX_BUCKET_SIZE);
80     MeshStreamingStrategy (CkMigrateMessage *m) : Strategy(m){ }
81         
82     void insertMessage (MessageHolder *msg);
83     void doneInserting ();
84     //void beginProcessing (int ignored);
85     void RegisterPeriodicFlush (void);
86     void FlushColumn (int column);
87     void FlushRow (int row);
88     void FlushBuffers (void);
89     void InsertIntoRowBucket (int row, char *msg);
90     int GetRowLength (void);
91     virtual void pup (PUP::er &p);
92     PUPable_decl (MeshStreamingStrategy);
93
94     virtual void handleMessage(void *msg) {
95       CmiAbort("[%d] MeshStreamingStrategy::handleMessage should never be called\n");
96     }
97
98     //Should be used only for array messages
99     //virtual void enableShortArrayMessagePacking()
100     //{shortMsgPackingFlag=CmiTrue;} 
101
102   private:
103
104     int num_pe;
105     int num_columns;
106     int num_rows;
107     int row_length;
108
109     int my_pe;
110     int my_column;
111     int my_row;
112
113     int flush_period;
114     int max_bucket_size;
115
116     //int strategy_id;
117
118     //int column_handler_id;
119
120     CkQ<char *> *column_bucket;
121     CkQ<int> *column_destQ;
122
123     int *column_bytes;
124     CkQ<char *> *row_bucket;
125 };
126 #endif
127
128 /*@}*/