Merge cleanup fixes before static array optimizations
[charm.git] / src / conv-com / convcomlibmanager.C
1 /**
2    @addtogroup ConvComlib
3    @{
4    @file
5
6    @brief Implementations of convcomlibmanager.h classes. It also defines all
7    the converse handlers to perform the broadcast of the strategies to all the
8    processors (synchronization process). These routines provide this support
9    also to the charm layer.
10
11    @author Sameer Kumar 28/03/04
12    @author Heavily revised, Filippo Gioachin 01/06
13 */
14
15 #include "convcomlibmanager.h"
16 #include "routerstrategy.h"
17 #include "StreamingStrategy.h"
18 #include "MeshStreamingStrategy.h"
19 #include "pipebroadcastconverse.h"
20 #include "converse.h"
21
22 int com_debug=0;
23
24 /// The location in the global table where the converse Comlib manager is located.
25 CkpvDeclare(ConvComlibManager, conv_com_object);
26
27 /***************************************************************************
28  * Handlers section:
29  *
30  * Declarations of all the converse handler IDs used, together with all the
31  * functions they are associated with.
32  ***************************************************************************/
33
34 /** Handler to which send messages inside the comlib framework. To this handler
35     arrive all the messages sent by common strategies without specific needs.
36     Strategies that need particular rerouting of their messages use their own
37     specific handlers. */
38 CkpvDeclare(int, comlib_handler);
39 /// Method invoked upon receipt a message routed through comlib.
40 void *strategyHandler(void *msg) {
41     CmiMsgHeaderBasic *conv_header = (CmiMsgHeaderBasic *) msg;
42     int instid = conv_header->stratid;
43
44 #ifndef CMK_OPTIMIZE
45     // check that the instid is not zero, meaning a possibly uninitialized value
46     if (instid == 0) {
47       CmiAbort("Comlib strategy ID is zero, did you forget to initialize a variable?\n");
48     }
49 #endif
50     
51     // when handling a message always call the lowest level
52     Strategy *strat = ConvComlibGetStrategy(instid);
53     
54     strat->handleMessage(msg);
55     return NULL;
56 }
57
58 /** @file
59  * 
60  *  Strategies synchronization process:
61  *
62  * 1) the strategies are inserted on processor 0 (and possibly in other
63  *    processors with the same order. The strategies are marked as "new"
64  *
65  * 2) when ComlibDoneCreating is called, processor 0 broadcast all the new
66  *    strategies to all the processors, and marks them as "inSync"
67  *
68  * 3) when a processor receives a table it updates its personal table with the
69  *    incoming, it marks all the strategies just arrived as "inSync", and it
70  *    sends an acknowledgement back to processor 0.
71  *
72  * 4) when an acknowledgement is received by processor 0, a counter is
73  *    decremented. When it reaches 0, all the "inSync" strategies are switched
74  *    to status "ready" and they can start working. All the messages in the
75  *    tmplist are delivered. The sync is broadcasted.
76  *
77  * 5) when an acknowledgement is received by a processor other than 0, all the
78  *    "inSync" strategies are switched to "ready" and the messages in tmplist
79  *    are delivered.
80  *
81  * 6) in order to prevent two consecutive table broadcasts to interfere with
82  *    each other, an additional acknowledgement is sent back by each processor
83  *    to processor 0 to allow a new table update to happen.
84  */
85
86 /** Handler to accept the second acknowledgements, after which a new table can
87     be broadcasted. Executed only on processor 0. */
88 CkpvDeclare(int, comlib_ready);
89 /// Method to acknowledge the ready status
90 void *comlibReadyHandler(void *msg) {
91   ComlibPrintf("[%d] Received ready acknowledgement\n",CmiMyPe());
92   CmiAssert(CkpvAccess(conv_com_object).acksReceived > 0);
93   if (--CkpvAccess(conv_com_object).acksReceived == 0) {
94     // ok, we are done. Do we have to broadcast a new table?
95     ComlibPrintf("Strategy table propagation finished\n");
96     CkpvAccess(conv_com_object).busy = CmiFalse;
97     if (CkpvAccess(conv_com_object).doneCreatingScheduled) {
98       CkpvAccess(conv_com_object).doneCreating();
99     }
100   }
101   CmiFree(msg);
102   return NULL;
103 }
104
105 /** Handler to count and accept the acknowledgements of table received. On
106     processor zero this handler counts the number of acks. On all other
107     processors it simply accept it */
108 CkpvDeclare(int, comlib_table_received);
109
110 /// Method invoked upon receipt of an acknowledgement of table received
111 void *comlibTableReceivedHandler(void *msg) {
112   if (CmiMyPe() == 0) {
113         //    CmiPrintf("Num acks to go: %d\n",CkpvAccess(conv_com_object).acksReceived);
114     if (--CkpvAccess(conv_com_object).acksReceived == 0) {
115       CkpvAccess(conv_com_object).tableReady();
116       // reset acksReceived for the second step
117       //CmiPrintf("All acks received, broadcasting message to table_received\n");
118       CkpvAccess(conv_com_object).acksReceived = CmiNumPes() - 1;
119       CmiSyncBroadcastAndFree(CmiReservedHeaderSize, (char*)msg);
120     } else {
121       CmiFree(msg);
122     }
123   } else {
124     CkpvAccess(conv_com_object).tableReady();
125     CmiSetHandler(msg, CkpvAccess(comlib_ready));
126     CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
127   }  
128   return NULL;
129 }
130
131 /** Handler to broadcast all the strategies to all the processors. This is
132     invoked on all processors except processor zero, upon a broadcast message
133     from processor zero (in ComlibDoneInserting). */
134 CkpvDeclare(int, comlib_receive_table);
135
136 /// Method invoked upon receipt of the strategy table
137 void *comlibReceiveTableHandler(void *msg) {
138   // unpack the message into a StrategyWrapper
139   ComlibPrintf("Received new strategy table\n");
140   StrategyWrapper sw;
141   PUP::fromMem pm(((char*)msg)+CmiReservedHeaderSize);
142   pm|sw;
143
144   // insert the strategies into the local table
145   for (int i=0; i<sw.nstrats; ++i) {
146     Strategy *current = CkpvAccess(conv_com_object).getStrategy(sw.position[i]);
147     if (sw.replace[i] && current != NULL) {
148       // delete the old strategy. Since it is requested, it is safe
149       delete current;
150       current = NULL;
151       CkpvAccess(conv_com_object).decrementNumStrats(); 
152     }
153     if (current == NULL) {
154       // if current is NULL either the strategy has never been set yet, or we
155       // are replacing it
156       CkpvAccess(conv_com_object).setStrategy(sw.position[i], sw.strategy[i]);
157       CkpvAccess(conv_com_object).incrementNumStrats(); 
158     } else {
159       // let's delete the incoming strategy since it is not used
160       delete sw.strategy[i];
161     }
162     CkpvAccess(conv_com_object).inSync(sw.position[i]);
163   }
164
165   // cheat about the size of the message
166   CmiSetHandler(msg, CkpvAccess(comlib_table_received));
167   CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
168   return NULL;
169 }
170
171 /***************************************************************************
172  * ConvComlibManager section:
173  *
174  * Implementation of the functions defined in the ConvComlibManager class.
175  ***************************************************************************/
176
177 ConvComlibManager::ConvComlibManager(): strategyTable(MAX_NUM_STRATS+1){
178   nstrats = 0;
179   init_flag = CmiFalse;
180   acksReceived = 0;
181   doneCreatingScheduled = CmiFalse;
182   busy = CmiFalse;
183 }
184
185 /** Insert a strategy into the system table, and return a handle to be used to
186     access the strategy later. This handle is the location in the strategy
187     table. From the strategy itself the user can get a handle for its usage. */
188 int ConvComlibManager::insertStrategy(Strategy *s) {
189
190   // This is not right, remove this stupid restriction!
191   if(nstrats >= MAX_NUM_STRATS)
192     CmiAbort("Too Many strategies\n");
193
194   int index = ++nstrats;
195   StrategyTableEntry &st = strategyTable[index];
196   
197   if(st.strategy != NULL) CmiAbort("Trying to insert a strategy over another one!");
198   
199   st.strategy = s;
200   st.isNew = 1;
201   st.bracketedSetupFinished = 2*CkNumPes();
202   
203   //s->setInstance(index);
204   
205   // if the strategy is pure converse or pure charm the following
206   // line is a duplication, but if a charm strategy embed a converse
207   // strategy it is necessary to set the instanceID in both
208   //s->getConverseStrategy()->setInstance(index); DEPRECATED
209
210   return index;
211 }
212
213 void ConvComlibManager::doneCreating() {
214   ComlibPrintf("Called doneCreating\n");
215   if (busy) {
216     // we have to delay the table broadcast because we are in the middle of another one
217     doneCreatingScheduled = CmiTrue;
218     return;
219   }
220   // if we reach here it means we are not busy and we can proceed
221   busy = CmiTrue;
222   acksReceived = CmiNumPes() - 1;
223   int count = 0;
224   for (int i=1; i<=nstrats; ++i) {
225     if (strategyTable[i].isNew) {
226       count++;
227     }
228   }
229
230   if (count > 0) {
231     // create the wrapper and link the strategies there
232     StrategyWrapper sw(count);
233     count = 0;
234     for (int i=1; i<=nstrats; ++i) {
235       if (strategyTable[i].isNew) {
236           sw.position[count] = i;
237           sw.replace[count] = CmiFalse;
238           sw.strategy[count] = strategyTable[i].strategy;
239           count++;
240           CkpvAccess(conv_com_object).inSync(i);
241       }
242     }
243
244     // pup the wrapper into a message
245     PUP::sizer ps;
246     ps|sw;
247     char *msg = (char*)CmiAlloc(ps.size() + CmiReservedHeaderSize);
248     PUP::toMem pm(msg+CmiReservedHeaderSize);
249     //int size = ps.size();
250     //pm|size;
251     pm|sw;
252     //for (int i=CmiReservedHeaderSize; i<CmiReservedHeaderSize+size; ++i) {
253     //  CmiPrintf("%x",((char*)msg)[i]);
254     //}
255     //CmiPrintf("\n");
256     CmiSetHandler(msg, CkpvAccess(comlib_receive_table));
257     CmiSyncBroadcastAndFree(ps.size()+CmiReservedHeaderSize, msg);
258
259     /* NOT USED NOW!
260     // call the finalizeCreation after the strategies has been packed
261     for (int i=0; i<strategyTable.size(); ++i) {
262       if (strategyTable[i].isNew) strategyTable[i].strategy->finalizeCreation();
263     }
264     */
265   } else {
266     busy = CmiFalse;
267   }
268 }
269
270 void ConvComlibManager::tableReady() {
271   for (int i=1; i<strategyTable.size(); ++i) {
272     if (strategyTable[i].isInSync) {
273       ComlibPrintf("[%d] ConvComlibManager::tableReady Enabling strategy %d\n",CmiMyPe(),i);
274       strategyTable[i].isInSync = 0;
275       enableStrategy(i);
276     }
277   }
278 }
279
280 #include "ComlibStrategy.h"
281
282 void ConvComlibManager::enableStrategy(int i) {
283   strategyTable[i].isReady = 1;
284   // deliver all the messages in the tmplist to the strategy
285   MessageHolder *mh;
286   while ((mh=strategyTable[i].tmplist.deq()) != NULL) {
287     CharmMessageHolder*cmh = (CharmMessageHolder*)mh;
288     cmh->checkme();
289     cmh->sec_id = cmh->copy_of_sec_id;
290
291 #if DEBUG_MULTICAST
292     int nelem =cmh->sec_id->_nElems;
293     CkPrintf("[%d] enableStrategy() pushing message into strategy %d using copy of sec_id stored when enqueuing message (message=%p nelem=%d)\n",CmiMyPe(),i, mh, nelem);
294 #endif
295
296     strategyTable[i].strategy->insertMessage(mh);
297     //    ((CharmMessageHolder*)mh)->freeCopyOf_sec_id();
298   }
299   for (int j=0; j<strategyTable[i].call_doneInserting; ++j) {
300     strategyTable[i].strategy->doneInserting();
301   }
302   strategyTable[i].call_doneInserting = 0;
303 }
304
305
306
307 /// Handler for dummy messages...
308 /// @TODO: Find out why we need this stupid empty messages and get rid of them (they are used only in router strategies)
309 CkpvDeclare(int, RecvdummyHandle);
310 //handler for dummy messages
311 void recv_dummy(void *msg){
312     ComlibPrintf("Received Dummy %d\n", CmiMyPe());
313     CmiFree(msg);
314 }
315
316 /// @TODO: hack for PipeBroadcastStrategy to register its handlers, fix it.
317 //extern void propagate_handler(void *);
318 extern void propagate_handler_frag(void *);
319
320
321 /** At startup on each processor, this method is called. 
322     This sets up the converse level comlib strategies.
323
324     This is called before any mainchare main functions.
325  */
326 void initConvComlibManager(){ 
327
328     if(!CkpvInitialized(conv_com_object))
329       CkpvInitialize(ConvComlibManager, conv_com_object);
330     
331     
332     if(CkpvAccess(conv_com_object).getInitialized()) {
333       CmiPrintf("Comlib initialized more than once!\n");
334       return;
335     }
336     
337     CkpvInitialize(int, RecvdummyHandle);
338     CkpvAccess(RecvdummyHandle) = CkRegisterHandler((CmiHandler)recv_dummy);
339
340     CkpvInitialize(int, comlib_receive_table);
341     CkpvAccess(comlib_receive_table) = CkRegisterHandler((CmiHandler)comlibReceiveTableHandler);
342     CkpvInitialize(int, comlib_table_received);
343     CkpvAccess(comlib_table_received) = CkRegisterHandler((CmiHandler)comlibTableReceivedHandler);
344     CkpvInitialize(int, comlib_ready);
345     CkpvAccess(comlib_ready) = CkRegisterHandler((CmiHandler)comlibReadyHandler);
346
347     // init strategy specific variables
348
349     // router strategy
350     CkpvInitialize(int, RouterRecvHandle);
351     CkpvAccess(RouterRecvHandle) = CkRegisterHandler((CmiHandler)routerRecvManyCombinedMsg);
352     CkpvInitialize(int, RouterProcHandle);
353     CkpvAccess(RouterProcHandle) = CkRegisterHandler((CmiHandler)routerProcManyCombinedMsg);
354     CkpvInitialize(int, RouterDummyHandle);
355     CkpvAccess(RouterDummyHandle) = CkRegisterHandler((CmiHandler)routerDummyMsg);    
356
357     // streaming strategy
358     CpvInitialize(int, streaming_handler_id);
359     CpvAccess(streaming_handler_id) = CmiRegisterHandler(StreamingHandlerFn);
360
361     // mesh streaming strategy
362     CkpvInitialize(int, streaming_column_handler_id);
363     CkpvAccess(streaming_column_handler_id) = CkRegisterHandler(streaming_column_handler);
364
365     // pipelined broadcast
366     CkpvInitialize(int, pipeline_handler);
367     CkpvInitialize(int, pipeline_frag_handler);
368     CkpvAccess(pipeline_handler) = CkRegisterHandler((CmiHandler)PipelineHandler);
369     CkpvAccess(pipeline_frag_handler) = CkRegisterHandler((CmiHandler)PipelineFragmentHandler);
370     
371     // general handler
372     CkpvInitialize(int, comlib_handler);
373     CkpvAccess(comlib_handler) = CkRegisterHandler((CmiHandler) strategyHandler);
374
375     //PUPable_reg(Strategy); ABSTRACT
376     //PUPable_reg(ConvComlibInstanceHandle);
377     if (CmiMyRank() == 0) {
378           PUPable_reg(RouterStrategy);
379       PUPable_reg(StreamingStrategy);
380       PUPable_reg(MeshStreamingStrategy);
381       PUPable_reg(PipeBroadcastConverse);
382       PUPable_reg(MessageHolder);
383     }
384     CkpvAccess(conv_com_object).setInitialized();
385 }
386
387 // #ifdef __cplusplus
388 // extern "C" {
389 // #endif
390 //   void ComlibInit() {initComlibManager();}
391 // #ifdef __cplusplus
392 // }
393 // #endif
394
395
396 /***************************************************************************
397  * User section:
398  *
399  * Implementation of the functions used by the user
400  ***************************************************************************/
401
402 Strategy *ConvComlibGetStrategy(int loc) {
403     //Calling converse strategy lets Charm++ strategies one strategy
404     //table entry but multiple layers of strategies (Charm on top of Converse).
405     return CkpvAccess(conv_com_object).getStrategy(loc);
406 }
407
408 // Why is this here for? Guess it is for the routers...
409 void ConvComlibScheduleDoneInserting(int loc) {
410   CkpvAccess(conv_com_object).getStrategyTable(loc)->call_doneInserting++;
411 }
412
413
414   
415 void ConvComlibManager::insertMessage(MessageHolder* msg, int instid) {
416   ComlibPrintf("[%d] enqueuing message for strategy %d in tmplist\n",CmiMyPe(),instid);
417 #ifndef CMK_OPTIMIZE
418   if (instid == 0) CmiAbort("Trying to send a message through comlib strategy zero, did you forget to initialize zome variable?\n");
419 #endif
420   if (isReady(instid)) {
421     ComlibPrintf("[%d] insertMessage inserting into strategy\n", CmiMyPe());
422     strategyTable[instid].strategy->insertMessage(msg);
423   }
424   else{
425     // In this case, we will enqueue the messages, so they can be delivered 
426     // once the strategy is initialized and ready to be used. This is tricky
427     // because the message contains a pointer to its "CkSectionID *sec_id".
428     // Somewhere this structure is freed before these messages are dequeued.
429     // Thus we try to copy the sec_id by calling saveCopyOf_sec_id() which
430     // performs a shallow copy of sec_id, which appears to be sufficient.
431     // This is only a shallow copy, so some pointers inside may well be 
432     // dead.
433     //
434     // FIXME: This should really be fixed in some other manner, such 
435     // as not letting the sec_id get deleted in the first place. Or the relevant 
436     // sec_id fields could be copied into the CharmMessageHolder object.
437
438 #if DEBUG_MULTICAST
439     int nelem = ((CharmMessageHolder*)msg)->sec_id->_nElems;
440     void * si = ((CharmMessageHolder*)msg)->sec_id;
441     ComlibPrintf("[%d] insertMessage inserting into tmplist with msg=%p si=%p nelem=%d\n", CmiMyPe(), msg, si, nelem);
442 #endif
443
444     ComlibPrintf("[%d] msg=%p\n", CkMyPe(), dynamic_cast<CharmMessageHolder*>(msg));
445     ((CharmMessageHolder*)msg)->saveCopyOf_sec_id();
446     ComlibPrintf("[%d] insertMessage inserting into tmplist  intid=%d\n", CmiMyPe(), instid);
447     strategyTable[instid].tmplist.enq(msg);
448   }
449
450 }
451   
452
453
454
455 void ConvComlibManager::printDiagnostics(){
456
457
458   //  CkVec<StrategyTableEntry> strategyTable
459   int ready = 0;
460   int tmplistTotal = 0;
461
462   int size = strategyTable.size();
463   //  CkPrintf("[%d]   converse level strategyTable.size()=%d\n", CkMyPe(), size);
464   for(int i=0;i<size;i++){
465     if(strategyTable[i].isReady){
466       ready++;
467       //  CkPrintf("[%d]   strategyTable[%d] is ready\n", CkMyPe(), i);
468     } else {
469       // CkPrintf("[%d]   strategyTable[%d] is not ready\n", CkMyPe(), i);
470     }
471     
472     int nmsg = strategyTable[i].tmplist.length();
473     
474     tmplistTotal += nmsg;
475     //      CkPrintf("[%d]   strategyTable[%d] has %d messages in tmplist\n", CkMyPe(), i, nmsg);
476   }
477
478   if(tmplistTotal>0){
479     CkPrintf("[%d]  %d of %d converse strategies are ready (%d msgs buffered)\n", CkMyPe(), ready, size, tmplistTotal);
480   }
481 }
482
483
484
485
486
487 /*@}*/