Checking in a new broadcast strategy which broadcasts to arrays along a load-balanced...
[charm.git] / src / ck-com / StreamingStrategy.C
1 #include "StreamingStrategy.h"
2 #include "MsgPacker.h"
3
4 void StreamingHandlerFn(void *msg) {
5     CombinedMessage hdr;
6     
7     CkPrintf("In streaming handler fn\n");
8
9     PUP_fromCmiAllocMem fp(msg);
10     fp | hdr;
11     
12     for(int count = 0; count < hdr.nmsgs; count ++) {
13         char *msg;
14         fp.pupCmiAllocBuf((void **)&msg);
15         int size = SIZEFIELD(msg);
16         CmiSyncSendAndFree(CkMyPe(), size, msg);
17     }
18     CmiFree(msg);
19     return;
20 }
21
22 StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
23     : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
24 {
25     streamingMsgBuf=NULL;
26     streamingMsgCount=NULL;
27     shortMsgPackingFlag = CmiFalse;
28     idleFlush = CmiTrue;
29     streaming_handler_id = 0;
30     setType(ARRAY_STRATEGY);
31 }
32
33 void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
34
35     int pe=cmsg->dest_proc;
36     char *msg = cmsg->getCharmMessage();
37     envelope *env = UsrToEnv(msg);
38     int size = env->getTotalsize();
39
40     if(size > MAX_STREAMING_MESSAGE_SIZE) {//AVOID COPYING
41         ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
42         CmiSyncSendAndFree(pe, size, (char *)env);
43         delete cmsg;
44         return;
45     }
46
47     ComlibPrintf("StreamingStrategy::insertMessage: buffering t=%d, n=%d, s=%d\n",  
48                  PERIOD, bufferMax, size);
49     
50     streamingMsgBuf[pe].enq(cmsg);
51     streamingMsgCount[pe]++;
52     if (streamingMsgCount[pe] > bufferMax) flushPE(pe);
53 }
54
55 void StreamingStrategy::doneInserting() {
56   ComlibPrintf("[%d] In Streaming strategy::doneInserting\n", CkMyPe());
57   //Do nothing
58 }
59
60 /// Send off all accumulated messages for this PE:
61 void StreamingStrategy::flushPE(int pe) {
62
63   //CkPrintf("Checking %d\n", pe);
64
65   if(streamingMsgCount[pe] == 0)
66       return; //Nothing to do.
67   
68   CharmMessageHolder *cmsg, *toBeDeleted = NULL;
69   int size = 0;
70   if(shortMsgPackingFlag){
71       MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
72       CombinedMessage *msg; 
73       mpack.getMessage(msg, size);
74       ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
75                    CkMyPe(), streamingMsgCount[pe], pe); 
76       CmiSyncSendAndFree(pe, size, (char *)msg);
77       streamingMsgCount[pe] = 0;
78   }
79   else {
80     // Build a CmiMultipleSend list of messages to be sent off:
81     int msg_count=streamingMsgCount[pe], msg_pe=0;
82     if(msg_count == 1) {
83         cmsg = streamingMsgBuf[pe].deq();
84         char *msg = cmsg->getCharmMessage();
85         envelope *env = UsrToEnv(msg);
86         int size = env->getTotalsize();
87         CmiSyncSendAndFree(pe, size, (char *)env);
88         ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
89                      CkMyPe(), pe);            
90         delete cmsg;
91         streamingMsgCount[pe] = 0;
92         return;
93     }
94     
95     char **msgComps = new char*[msg_count];
96     int *sizes = new int[msg_count];
97     ComlibPrintf("[%d] StreamingStrategy::flushPE: %d messages to %d\n", 
98                  CkMyPe(), msg_count, pe);            
99     while (!streamingMsgBuf[pe].isEmpty()) {
100         cmsg = streamingMsgBuf[pe].deq();
101         char *msg = cmsg->getCharmMessage();
102         envelope *env = UsrToEnv(msg);
103         sizes[msg_pe] = env->getTotalsize();
104         msgComps[msg_pe] = (char *)env;
105         msg_pe++;
106         
107         // Link cmsg into the toBeDeleted list:
108         cmsg->next = toBeDeleted;
109         toBeDeleted = cmsg;            
110     }
111     
112     if (msg_count!=msg_pe) 
113         CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
114     
115     ComlibPrintf("--> Sending %d Messages to PE %d\n", msg_count, pe);
116     
117     CmiMultipleSend(pe, msg_count, sizes, msgComps);
118     delete [] msgComps;
119     delete [] sizes;
120     streamingMsgCount[pe] = 0;
121     
122     // Traverse the tobeDeleted list:
123     cmsg = toBeDeleted;
124     while (toBeDeleted) {
125         toBeDeleted = (CharmMessageHolder *)toBeDeleted->next;
126         CkFreeMsg(cmsg->getCharmMessage());
127         delete cmsg;
128         cmsg = toBeDeleted;            
129     }     
130     
131     /*
132     PUP_cmiAllocSizer sp;
133     CombinedMessage hdr;
134     
135     sp | hdr;
136
137     int nmsgs = streamingMsgCount[pe];
138     for(int count = 0; count < nmsgs; count++) {
139         cmsg = streamingMsgBuf[pe][count];
140         char *msg = cmsg->getCharmMessage();
141         envelope *env = UsrToEnv(msg);
142         size = env->getTotalsize();
143         
144         sp.pupCmiAllocBuf((void **)&env, size);
145     }
146     
147     char *newmsg = (char *)CmiAlloc(sp.size());
148     PUP_toCmiAllocMem mp(newmsg);
149     
150     hdr.aid.setZero();
151     hdr.srcPE = CkMyPe();
152     hdr.nmsgs = nmsgs;
153     mp | hdr;
154     
155     for(int count = 0; count < nmsgs; count++) {
156         cmsg = streamingMsgBuf[pe][count];
157         char *msg = cmsg->getCharmMessage();
158         envelope *env = UsrToEnv(msg);
159         size = env->getTotalsize();
160         
161         mp.pupCmiAllocBuf((void **)&env, size);
162     }
163
164     for(int count = 0; count < nmsgs; count++) {
165         cmsg = streamingMsgBuf[pe][count];
166         delete cmsg;
167     }    
168     
169     streamingMsgCount[pe] = 0;
170     CmiSetHandler(newmsg, streaming_handler_id);
171     CmiSyncSendAndFree(pe, sp.size(), newmsg); 
172     */
173   }
174 }
175
176 void StreamingStrategy::periodicFlush() {
177     for (int proc = 0; proc < CkNumPes(); proc++) 
178         flushPE(proc);
179 }
180
181 struct MsgStruct {
182     char header[CmiReservedHeaderSize];
183     void *addr;
184 };
185
186
187 void testHandler(void *msg) {
188     StreamingStrategy *s;
189
190     MsgStruct *mstruct = (MsgStruct *)msg;
191
192     s = (StreamingStrategy *) (mstruct->addr);
193     s->periodicFlush();
194
195     CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
196 }
197
198 /// This routine is called via CcdCallFnAfter to flush all messages:
199 static void call_delayFlush(void *arg,double curWallTime) {
200     StreamingStrategy *s=(StreamingStrategy *)arg;
201     s->periodicFlush();
202     s->registerFlush(); //Set ourselves up to be called again
203 }
204
205 void StreamingStrategy::registerFlush(void) {
206     //CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
207     CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CkMyPe());
208 }
209
210 /// This routine is called via CcdCallOnCondition to flush all messages:
211 static void call_idleFlush(void *arg,double curWallTime) {
212     StreamingStrategy *s=(StreamingStrategy *)arg;
213     s->periodicFlush();
214 }
215
216 // When we're finally ready to go, register for timeout and idle flush.
217 void StreamingStrategy::beginProcessing(int ignored) {
218     registerFlush();
219     //if(idleFlush)
220     //  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
221     //                             (CcdVoidFn)call_idleFlush, 
222     //                             (void *)this, CkMyPe());
223     
224     streaming_handler_id = CkRegisterHandler(StreamingHandlerFn);
225     
226     /*
227       int handler = CkRegisterHandler(testHandler);
228       
229       MsgStruct *msg = (MsgStruct *)CmiAlloc(sizeof(MsgStruct));
230       msg->addr = this;
231       CmiSetHandler(msg, handler);
232       
233       CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
234     */
235 }
236
237 void StreamingStrategy::pup(PUP::er &p){
238
239   CharmStrategy::pup(p);
240   p | PERIOD;
241   p | bufferMax;
242   p | shortMsgPackingFlag;
243   p | idleFlush;
244   p | streaming_handler_id;
245
246   if(p.isUnpacking()) {
247       streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
248       streamingMsgCount = new int[CkNumPes()];
249       for(int count = 0; count < CkNumPes(); count ++)
250           streamingMsgCount[count] = 0;
251   }
252 }
253
254 //PUPable_def(StreamingStrategy);