fixing for ft
[charm.git] / src / conv-com / StreamingStrategy.C
1 /**
2    @addtogroup ComlibConverseStrategy
3    @{
4    @file 
5 */
6
7 #include "StreamingStrategy.h"
8 //#include "MsgPacker.h"
9 #include "pup_cmialloc.h"
10
11 /** The handler registerd to StreamingHandlerFn */
12 CpvDeclare(int, streaming_handler_id);
13 /**
14  * Handler used to receive incoming combined messages, split them into the
15  * individual messages and deliver all of them to the application.
16  */
17 void StreamingHandlerFn(void *msg) {
18     StreamingMessage hdr;
19     
20     ComlibPrintf("[%d] In streaming handler fn\n",CmiMyPe());
21
22     PUP_fromCmiAllocMem fp(msg);
23     fp | hdr;
24     
25     for(int count = 0; count < hdr.nmsgs; count ++) {
26         char *msg;
27         fp.pupCmiAllocBuf((void **)&msg);
28         int size = SIZEFIELD(msg);
29         CmiSyncSendAndFree(CmiMyPe(), size, msg);
30     }
31     CmiFree(msg);
32     return;
33 }
34
35 StreamingStrategy::StreamingStrategy(int periodMs, int bufferMax_, 
36                                      int msgSizeMax_, int bufSizeMax_)
37     : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_), 
38       bufSizeMax(bufSizeMax_), Strategy() {
39     streamingMsgBuf = NULL;
40     streamingMsgCount = NULL;
41     bufSize = NULL;
42     //shortMsgPackingFlag = CmiFalse;
43     idleFlush = CmiTrue;
44     //streaming_handler_id = 0;
45     setType(CONVERSE_STRATEGY);
46 }
47
48 StreamingStrategy::StreamingStrategy(double periodMs, int bufferMax_, 
49                                      int msgSizeMax_, int bufSizeMax_)
50     : PERIOD(periodMs), bufferMax(bufferMax_), msgSizeMax(msgSizeMax_), 
51       bufSizeMax(bufSizeMax_), Strategy() {
52     streamingMsgBuf = NULL;
53     streamingMsgCount = NULL;
54     bufSize = NULL;
55     //shortMsgPackingFlag = CmiFalse;
56     idleFlush = CmiTrue;
57     //streaming_handler_id = 0;
58     setType(CONVERSE_STRATEGY);
59 }
60
61 void StreamingStrategy::insertMessage(MessageHolder *cmsg) {
62
63     int pe=cmsg->dest_proc;
64     char *msg = cmsg->getMessage();
65     //envelope *env = UsrToEnv(msg);
66     int size = cmsg->getSize(); // env->getTotalsize();
67
68     if(size > msgSizeMax) {//AVOID COPYING
69         ComlibPrintf("[%d] StreamingStrategy::insertMessage: to %d direct send %d\n",CmiMyPe(),pe,size);
70         CmiSyncSendAndFree(pe, size, msg);
71         delete cmsg;
72         return;
73     }
74
75     ComlibPrintf("[%d] StreamingStrategy::insertMessage: buffering t=%g, n=%d, d=%d, s=%d\n",
76                  CmiMyPe(), PERIOD, bufferMax, pe, size);
77     
78     streamingMsgBuf[pe].enq(cmsg);
79     streamingMsgCount[pe]++;
80     bufSize[pe]+=size;
81     if (streamingMsgCount[pe] >= bufferMax || bufSize[pe] >= bufSizeMax) flushPE(pe);
82 }
83
84 void StreamingStrategy::doneInserting() {
85   ComlibPrintf("[%d] StreamingStrategy::doneInserting\n", CmiMyPe());
86   //Do nothing
87
88   periodicFlush();
89 }
90
91 /// Send off all accumulated messages for this PE:
92 void StreamingStrategy::flushPE(int pe) {
93
94   //CkPrintf("Checking %d\n", pe);
95
96   if(streamingMsgCount[pe] == 0)
97       return; //Nothing to do.
98   
99   MessageHolder *cmsg;
100   int size = 0;
101  
102
103     // Build a CmiMultipleSend list of messages to be sent off:
104     int msg_count=streamingMsgCount[pe];
105
106     // If we have a single message we don't want to copy it
107     if(msg_count == 1) {
108         cmsg = streamingMsgBuf[pe].deq();
109         char *msg = cmsg->getMessage();
110         //envelope *env = UsrToEnv(msg);
111         int size = cmsg->getSize();
112         CmiSyncSendAndFree(pe, size, msg);
113         ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
114                      CmiMyPe(), pe);            
115         delete cmsg;
116         streamingMsgCount[pe] = 0;
117         bufSize[pe] = 0;
118         return;
119     }
120    
121     
122     PUP_cmiAllocSizer sp;
123     StreamingMessage hdr;
124     
125     sp | hdr;
126
127     int nmsgs = streamingMsgCount[pe];
128     int count;
129     for(count = 0; count < nmsgs; count++) {
130         cmsg = streamingMsgBuf[pe][count];
131         char *msg = cmsg->getMessage();
132         //envelope *env = UsrToEnv(msg);
133         size = cmsg->getSize();
134         
135         sp.pupCmiAllocBuf((void**)&msg, size);
136     }
137     
138     char *newmsg = (char *)CmiAlloc(sp.size());
139     PUP_toCmiAllocMem mp(newmsg);
140     
141     hdr.srcPE = CmiMyPe();
142     hdr.nmsgs = nmsgs;
143     mp | hdr;
144     
145     for(count = 0; count < nmsgs; count++) {
146         cmsg = streamingMsgBuf[pe][count];
147         char *msg = cmsg->getMessage();
148         //envelope *env = UsrToEnv(msg);
149         size = cmsg->getSize();
150         
151         mp.pupCmiAllocBuf((void**)&msg, size);
152     }
153
154     for(count = 0; count < nmsgs; count++) {
155         cmsg = streamingMsgBuf[pe].deq();
156         //CkFreeMsg(cmsg->getCharmMessage());
157         CmiFree(cmsg->getMessage());
158         delete cmsg;
159     }    
160     
161     streamingMsgCount[pe] = 0;
162     bufSize[pe] = 0;
163     CmiSetHandler(newmsg, CpvAccess(streaming_handler_id));
164     CmiSyncSendAndFree(pe, sp.size(), newmsg); 
165     //}
166 }
167
168 void StreamingStrategy::periodicFlush() {
169     for (int proc = 0; proc < CmiNumPes(); proc++) 
170         flushPE(proc);
171 }
172
173 /*
174 struct MsgStruct {
175     char header[CmiReservedHeaderSize];
176     void *addr;
177 };
178
179
180 void testHandler(void *msg) {
181     StreamingStrategy *s;
182
183     MsgStruct *mstruct = (MsgStruct *)msg;
184
185     s = (StreamingStrategy *) (mstruct->addr);
186     s->periodicFlush();
187
188     CmiSyncSendAndFree(CmiMyPe(), sizeof(MsgStruct), (char *)msg);
189 }
190 */
191
192 /// This routine is called via CcdCallFnAfter to flush all messages:
193 static void call_delayFlush(void *arg,double curWallTime) {
194     StreamingStrategy *s=(StreamingStrategy *)arg;
195     s->periodicFlush();
196     s->registerFlush(); //Set ourselves up to be called again
197 }
198
199 void StreamingStrategy::registerFlush(void) {
200     //CkPrintf("[%d] Will call function again every %d ms\n",CmiMyPe(),PERIOD);
201     CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CmiMyPe());
202 }
203
204 /// This routine is called via CcdCallOnCondition to flush all messages:
205 static void call_idleFlush(void *arg,double curWallTime) {
206     StreamingStrategy *s=(StreamingStrategy *)arg;
207     s->periodicFlush();
208 }
209
210 // When we're finally ready to go, register for timeout and idle flush.
211 /*
212 void StreamingStrategy::beginProcessing(int ignored) {
213     registerFlush();
214     //if(idleFlush)
215     //  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
216     //                             (CcdVoidFn)call_idleFlush, 
217     //                             (void *)this, CmiMyPe());
218     
219     streaming_handler_id = CkRegisterHandler(StreamingHandlerFn);
220     
221 //       int handler = CkRegisterHandler(testHandler);
222       
223 //       MsgStruct *msg = (MsgStruct *)CmiAlloc(sizeof(MsgStruct));
224 //       msg->addr = this;
225 //       CmiSetHandler(msg, handler);
226       
227 //       CmiSyncSendAndFree(CmiMyPe(), sizeof(MsgStruct), (char *)msg);
228
229 }
230 */
231
232 void StreamingStrategy::pup(PUP::er &p){
233
234   Strategy::pup(p);
235   p | PERIOD;
236   p | bufferMax;
237   p | msgSizeMax;
238   //p | shortMsgPackingFlag;
239   p | bufSizeMax;
240   p | idleFlush;
241   //p | streaming_handler_id;
242
243   if(p.isPacking() || p.isUnpacking()) {
244       streamingMsgBuf = new CkQ<MessageHolder *>[CmiNumPes()];
245       streamingMsgCount = new int[CmiNumPes()];
246       bufSize = new int[CmiNumPes()];
247       for(int count = 0; count < CmiNumPes(); count ++) {
248         streamingMsgCount[count] = 0;
249         bufSize[count] = 0;
250       }
251   }
252
253   // packing is done once in processor 0, unpacking is done once in all processors except 0
254   if (p.isPacking() || p.isUnpacking()) registerFlush();
255 }
256
257 PUPable_def(StreamingStrategy)
258
259 /*@}*/