bb33c2852744152c45342dde07ea4af5b059fbe0
[charm.git] / src / conv-com / convcomlibmanager.C
1 /**
2    @addtogroup ConvComlib
3    @{
4    @file
5
6    @brief Implementations of convcomlibmanager.h classes. It also defines all
7    the converse handlers to perform the broadcast of the strategies to all the
8    processors (synchronization process). These routines provide this support
9    also to the charm layer.
10
11    @author Sameer Kumar 28/03/04
12    @author Heavily revised, Filippo Gioachin 01/06
13 */
14
15 #include "convcomlibmanager.h"
16 #include "routerstrategy.h"
17 #include "StreamingStrategy.h"
18 #include "MeshStreamingStrategy.h"
19 #include "pipebroadcastconverse.h"
20 #include "converse.h"
21
22 int com_debug=0;
23
24 /// The location in the global table where the converse Comlib manager is located.
25 CkpvDeclare(ConvComlibManager, conv_com_object);
26 /// A pointer to the location of the converse Comlib manager.
27 CkpvDeclare(ConvComlibManager *, conv_com_ptr);
28
29 /***************************************************************************
30  * Handlers section:
31  *
32  * Declarations of all the converse handler IDs used, together with all the
33  * functions they are associated with.
34  ***************************************************************************/
35
36 /** Handler to which send messages inside the comlib framework. To this handler
37     arrive all the messages sent by common strategies without specific needs.
38     Strategies that need particular rerouting of their messages use their own
39     specific handlers. */
40 CkpvDeclare(int, comlib_handler);
41 /// Method invoked upon receipt a message routed through comlib.
42 void *strategyHandler(void *msg) {
43     CmiMsgHeaderBasic *conv_header = (CmiMsgHeaderBasic *) msg;
44     int instid = conv_header->stratid;
45
46 #ifndef CMK_OPTIMIZE
47     // check that the instid is not zero, meaning a possibly uninitialized value
48     if (instid == 0) {
49       CmiAbort("Comlib strategy ID is zero, did you forget to initialize a variable?\n");
50     }
51 #endif
52     
53     // when handling a message always call the lowest level
54     Strategy *strat = ConvComlibGetStrategy(instid);
55     
56     strat->handleMessage(msg);
57     return NULL;
58 }
59
60 /* Strategies synchronization process:
61  *
62  * 1) the strategies are inserted on processor 0 (and possibly in other
63  *    processors with the same order. The strategies are marked as "new"
64  *
65  * 2) when ComlibDoneCreating is called, processor 0 broadcast all the new
66  *    strategies to all the processors, and marks them as "inSync"
67  *
68  * 3) when a processor receives a table it updates its personal table with the
69  *    incoming, it marks all the strategies just arrived as "inSync", and it
70  *    sends an acknowledgement back to processor 0.
71  *
72  * 4) when an acknowledgement is received by processor 0, a counter is
73  *    decremented. When it reaches 0, all the "inSync" strategies are switched
74  *    to status "ready" and they can start working. All the messages in the
75  *    tmplist are delivered. The sync is broadcasted.
76  *
77  * 5) when an acknowledgement is received by a processor other than 0, all the
78  *    "inSync" strategies are switched to "ready" and the messages in tmplist
79  *    are delivered.
80  *
81  * 6) in order to prevent two consecutive table broadcasts to interfere with
82  *    each other, an additional acknowledgement is sent back by each processor
83  *    to processor 0 to allow a new table update to happen.
84  */
85
86 /** Handler to accept the second acknowledgements, after which a new table can
87     be broadcasted. Executed only on processor 0. */
88 CkpvDeclare(int, comlib_ready);
89 /// Method to acknowledge the ready status
90 void *comlibReadyHandler(void *msg) {
91   ComlibPrintf("[%d] Received ready acknowledgement\n",CmiMyPe());
92   CmiAssert(CkpvAccess(conv_com_object).acksReceived > 0);
93   if (--CkpvAccess(conv_com_object).acksReceived == 0) {
94     // ok, we are done. Do we have to broadcast a new table?
95     ComlibPrintf("Strategy table propagation finished\n");
96     CkpvAccess(conv_com_object).busy = CmiFalse;
97     if (CkpvAccess(conv_com_object).doneCreatingScheduled) {
98       CkpvAccess(conv_com_object).doneCreating();
99     }
100   }
101   CmiFree(msg);
102   return NULL;
103 }
104
105 /** Handler to count and accept the acknowledgements of table received. On
106     processor zero this handler counts the number of acks. On all other
107     processors it simply accept it */
108 CkpvDeclare(int, comlib_table_received);
109
110 /// Method invoked upon receipt of an acknowledgement of table received
111 void *comlibTableReceivedHandler(void *msg) {
112   if (CmiMyPe() == 0) {
113         //    CmiPrintf("Num acks to go: %d\n",CkpvAccess(conv_com_object).acksReceived);
114     if (--CkpvAccess(conv_com_object).acksReceived == 0) {
115       CkpvAccess(conv_com_object).tableReady();
116       // reset acksReceived for the second step
117       //CmiPrintf("All acks received, broadcasting message to table_received\n");
118       CkpvAccess(conv_com_object).acksReceived = CmiNumPes() - 1;
119       CmiSyncBroadcastAndFree(CmiReservedHeaderSize, (char*)msg);
120     } else {
121       CmiFree(msg);
122     }
123   } else {
124     CkpvAccess(conv_com_object).tableReady();
125     CmiSetHandler(msg, CkpvAccess(comlib_ready));
126     CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
127   }  
128   return NULL;
129 }
130
131 /** Handler to broadcast all the strategies to all the processors. This is
132     invoked on all processors except processor zero, upon a broadcast message
133     from processor zero (in ComlibDoneInserting). */
134 CkpvDeclare(int, comlib_receive_table);
135
136 /// Method invoked upon receipt of the strategy table
137 void *comlibReceiveTableHandler(void *msg) {
138   // unpack the message into a StrategyWrapper
139   ComlibPrintf("Received new strategy table\n");
140   StrategyWrapper sw;
141   PUP::fromMem pm(((char*)msg)+CmiReservedHeaderSize);
142   pm|sw;
143
144   // insert the strategies into the local table
145   for (int i=0; i<sw.nstrats; ++i) {
146     Strategy *current = CkpvAccess(conv_com_object).getStrategy(sw.position[i]);
147     if (sw.replace[i] && current != NULL) {
148       // delete the old strategy. Since it is requested, it is safe
149       delete current;
150       current = NULL;
151     }
152     if (current == NULL) {
153       // if current is NULL either the strategy has never been set yet, or we
154       // are replacing it
155       CkpvAccess(conv_com_object).setStrategy(sw.position[i], sw.strategy[i]);
156     } else {
157       // let's delete the incoming strategy since it is not used
158       delete sw.strategy[i];
159     }
160     CkpvAccess(conv_com_object).inSync(sw.position[i]);
161   }
162
163   // cheat about the size of the message
164   CmiSetHandler(msg, CkpvAccess(comlib_table_received));
165   CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
166   return NULL;
167 }
168
169 /***************************************************************************
170  * ConvComlibManager section:
171  *
172  * Implementation of the functions defined in the ConvComlibManager class.
173  ***************************************************************************/
174
175 ConvComlibManager::ConvComlibManager(): strategyTable(MAX_NUM_STRATS+1){
176   nstrats = 0;
177   init_flag = CmiFalse;
178   acksReceived = 0;
179   doneCreatingScheduled = CmiFalse;
180   busy = CmiFalse;
181 }
182
183 /** Insert a strategy into the system table, and return a handle to be used to
184     access the strategy later. This handle is the location in the strategy
185     table. From the strategy itself the user can get a handle for its usage. */
186 int ConvComlibManager::insertStrategy(Strategy *s) {
187
188   // This is not right, remove this stupid restriction!
189   if(nstrats >= MAX_NUM_STRATS)
190     CmiAbort("Too Many strategies\n");
191
192   int index = ++nstrats;
193   StrategyTableEntry &st = strategyTable[index];
194   
195   if(st.strategy != NULL) CmiAbort("Trying to insert a strategy over another one!");
196   
197   st.strategy = s;
198   st.isNew = 1;
199   st.bracketedSetupFinished = 2*CkNumPes();
200   
201   //s->setInstance(index);
202   
203   // if the strategy is pure converse or pure charm the following
204   // line is a duplication, but if a charm strategy embed a converse
205   // strategy it is necessary to set the instanceID in both
206   //s->getConverseStrategy()->setInstance(index); DEPRECATED
207
208   return index;
209 }
210
211 void ConvComlibManager::doneCreating() {
212   ComlibPrintf("Called doneCreating\n");
213   if (busy) {
214     // we have to delay the table broadcast because we are in the middle of another one
215     doneCreatingScheduled = CmiTrue;
216     return;
217   }
218   // if we reach here it means we are not busy and we can proceed
219   busy = CmiTrue;
220   acksReceived = CmiNumPes() - 1;
221   int count = 0;
222   for (int i=1; i<=nstrats; ++i) {
223     if (strategyTable[i].isNew) {
224       count++;
225     }
226   }
227
228   if (count > 0) {
229     // create the wrapper and link the strategies there
230     StrategyWrapper sw(count);
231     count = 0;
232     for (int i=1; i<=nstrats; ++i) {
233       if (strategyTable[i].isNew) {
234           sw.position[count] = i;
235           sw.replace[count] = CmiFalse;
236           sw.strategy[count] = strategyTable[i].strategy;
237           count++;
238           CkpvAccess(conv_com_object).inSync(i);
239       }
240     }
241
242     // pup the wrapper into a message
243     PUP::sizer ps;
244     ps|sw;
245     char *msg = (char*)CmiAlloc(ps.size() + CmiReservedHeaderSize);
246     PUP::toMem pm(msg+CmiReservedHeaderSize);
247     //int size = ps.size();
248     //pm|size;
249     pm|sw;
250     //for (int i=CmiReservedHeaderSize; i<CmiReservedHeaderSize+size; ++i) {
251     //  CmiPrintf("%x",((char*)msg)[i]);
252     //}
253     //CmiPrintf("\n");
254     CmiSetHandler(msg, CkpvAccess(comlib_receive_table));
255     CmiSyncBroadcastAndFree(ps.size()+CmiReservedHeaderSize, msg);
256
257     /* NOT USED NOW!
258     // call the finalizeCreation after the strategies has been packed
259     for (int i=0; i<strategyTable.size(); ++i) {
260       if (strategyTable[i].isNew) strategyTable[i].strategy->finalizeCreation();
261     }
262     */
263   } else {
264     busy = CmiFalse;
265   }
266 }
267
268 void ConvComlibManager::tableReady() {
269   for (int i=1; i<strategyTable.size(); ++i) {
270     if (strategyTable[i].isInSync) {
271       ComlibPrintf("[%d] ConvComlibManager::tableReady Enabling strategy %d\n",CmiMyPe(),i);
272       strategyTable[i].isInSync = 0;
273       enableStrategy(i);
274     }
275   }
276 }
277
278 #include "ComlibStrategy.h"
279
280 void ConvComlibManager::enableStrategy(int i) {
281   strategyTable[i].isReady = 1;
282   // deliver all the messages in the tmplist to the strategy
283   MessageHolder *mh;
284   while ((mh=strategyTable[i].tmplist.deq()) != NULL) {
285     CharmMessageHolder*cmh = (CharmMessageHolder*)mh;
286     cmh->checkme();
287     cmh->sec_id = cmh->copy_of_sec_id;
288
289 #if DEBUG_MULTICAST
290     int nelem =cmh->sec_id->_nElems;
291     CkPrintf("[%d] enableStrategy() pushing message into strategy %d using copy of sec_id stored when enqueuing message (message=%p nelem=%d)\n",CmiMyPe(),i, mh, nelem);
292 #endif
293
294     strategyTable[i].strategy->insertMessage(mh);
295     //    ((CharmMessageHolder*)mh)->freeCopyOf_sec_id();
296   }
297   for (int j=0; j<strategyTable[i].call_doneInserting; ++j) {
298     strategyTable[i].strategy->doneInserting();
299   }
300   strategyTable[i].call_doneInserting = 0;
301 }
302
303
304
305 /// Handler for dummy messages...
306 /// @TODO: Find out why we need this stupid empty messages and get rid of them (they are used only in router strategies)
307 CkpvDeclare(int, RecvdummyHandle);
308 //handler for dummy messages
309 void recv_dummy(void *msg){
310     ComlibPrintf("Received Dummy %d\n", CmiMyPe());
311     CmiFree(msg);
312 }
313
314 /// @TODO: hack for PipeBroadcastStrategy to register its handlers, fix it.
315 //extern void propagate_handler(void *);
316 extern void propagate_handler_frag(void *);
317
318 /// An initialization routine which does preliminary initialization of the 
319 /// Converse commlib manager. 
320 void initComlibManager(){ 
321
322     if(!CkpvInitialized(conv_com_object))
323       CkpvInitialize(ConvComlibManager, conv_com_object);
324     
325     if(!CkpvInitialized(conv_com_ptr))
326       CkpvInitialize(ConvComlibManager *, conv_com_ptr);
327     
328     if(CkpvAccess(conv_com_object).getInitialized()) {
329       CmiPrintf("Comlib initialized more than once!\n");
330       return;
331     }
332     
333     CkpvAccess(conv_com_ptr) = &(CkpvAccess(conv_com_object));
334     
335     CkpvInitialize(int, RecvdummyHandle);
336     CkpvAccess(RecvdummyHandle) = CkRegisterHandler((CmiHandler)recv_dummy);
337
338     CkpvInitialize(int, comlib_receive_table);
339     CkpvAccess(comlib_receive_table) = CkRegisterHandler((CmiHandler)comlibReceiveTableHandler);
340     CkpvInitialize(int, comlib_table_received);
341     CkpvAccess(comlib_table_received) = CkRegisterHandler((CmiHandler)comlibTableReceivedHandler);
342     CkpvInitialize(int, comlib_ready);
343     CkpvAccess(comlib_ready) = CkRegisterHandler((CmiHandler)comlibReadyHandler);
344
345     // init strategy specific variables
346
347     // router strategy
348     CkpvInitialize(int, RouterRecvHandle);
349     CkpvAccess(RouterRecvHandle) = CkRegisterHandler((CmiHandler)routerRecvManyCombinedMsg);
350     CkpvInitialize(int, RouterProcHandle);
351     CkpvAccess(RouterProcHandle) = CkRegisterHandler((CmiHandler)routerProcManyCombinedMsg);
352     CkpvInitialize(int, RouterDummyHandle);
353     CkpvAccess(RouterDummyHandle) = CkRegisterHandler((CmiHandler)routerDummyMsg);    
354
355     // streaming strategy
356     CkpvInitialize(int, streaming_handler_id);
357     CkpvAccess(streaming_handler_id) = CkRegisterHandler(StreamingHandlerFn);
358
359     // mesh streaming strategy
360     CkpvInitialize(int, streaming_column_handler_id);
361     CkpvAccess(streaming_column_handler_id) = CkRegisterHandler(streaming_column_handler);
362
363     // pipelined broadcast
364     CkpvInitialize(int, pipeline_handler);
365     CkpvInitialize(int, pipeline_frag_handler);
366     CkpvAccess(pipeline_handler) = CkRegisterHandler((CmiHandler)PipelineHandler);
367     CkpvAccess(pipeline_frag_handler) = CkRegisterHandler((CmiHandler)PipelineFragmentHandler);
368     
369     // general handler
370     CkpvInitialize(int, comlib_handler);
371     CkpvAccess(comlib_handler) = CkRegisterHandler((CmiHandler) strategyHandler);
372
373     //PUPable_reg(Strategy); ABSTRACT
374     //PUPable_reg(ConvComlibInstanceHandle);
375     if (CmiMyRank() == 0) {
376           PUPable_reg(RouterStrategy);
377       PUPable_reg(StreamingStrategy);
378       PUPable_reg(MeshStreamingStrategy);
379       PUPable_reg(PipeBroadcastConverse);
380       PUPable_reg(MessageHolder);
381     }
382     CkpvAccess(conv_com_object).setInitialized();
383 }
384
385 #ifdef __cplusplus
386 extern "C" {
387 #endif
388   void ComlibInit() {initComlibManager();}
389 #ifdef __cplusplus
390 }
391 #endif
392
393
394 /***************************************************************************
395  * User section:
396  *
397  * Implementation of the functions used by the user
398  ***************************************************************************/
399
400 Strategy *ConvComlibGetStrategy(int loc) {
401     //Calling converse strategy lets Charm++ strategies one strategy
402     //table entry but multiple layers of strategies (Charm on top of Converse).
403     return (CkpvAccess(conv_com_ptr))->getStrategy(loc);
404 }
405
406 // Why is this here for? Guess it is for the routers...
407 void ConvComlibScheduleDoneInserting(int loc) {
408   CkpvAccess(conv_com_ptr)->getStrategyTable(loc)->call_doneInserting++;
409 }
410
411
412   
413 void ConvComlibManager::insertMessage(MessageHolder* msg, int instid) {
414   ComlibPrintf("[%d] enqueuing message for strategy %d in tmplist\n",CmiMyPe(),instid);
415 #ifndef CMK_OPTIMIZE
416   if (instid == 0) CmiAbort("Trying to send a message through comlib strategy zero, did you forget to initialize zome variable?\n");
417 #endif
418   if (isReady(instid)) {
419     ComlibPrintf("[%d] insertMessage inserting into strategy\n", CmiMyPe());
420     strategyTable[instid].strategy->insertMessage(msg);
421   }
422   else{
423     // In this case, we will enqueue the messages, so they can be delivered 
424     // once the strategy is initialized and ready to be used. This is tricky
425     // because the message contains a pointer to its "CkSectionID *sec_id".
426     // Somewhere this structure is freed before these messages are dequeued.
427     // Thus we try to copy the sec_id by calling saveCopyOf_sec_id() which
428     // performs a shallow copy of sec_id, which appears to be sufficient.
429     // This is only a shallow copy, so some pointers inside may well be 
430     // dead.
431     //
432     // FIXME: This should really be fixed in some other manner, such 
433     // as not letting the sec_id get deleted in the first place. Or the relevant 
434     // sec_id fields could be copied into the CharmMessageHolder object.
435
436 #if DEBUG_MULTICAST
437     int nelem = ((CharmMessageHolder*)msg)->sec_id->_nElems;
438     void * si = ((CharmMessageHolder*)msg)->sec_id;
439     ComlibPrintf("[%d] insertMessage inserting into tmplist with msg=%p si=%p nelem=%d\n", CmiMyPe(), msg, si, nelem);
440 #endif
441
442     ComlibPrintf("[%d] msg=%p\n", CkMyPe(), dynamic_cast<CharmMessageHolder*>(msg));
443     ((CharmMessageHolder*)msg)->saveCopyOf_sec_id();
444     ComlibPrintf("[%d] insertMessage inserting into tmplist  intid=%d\n", CmiMyPe(), instid);
445     strategyTable[instid].tmplist.enq(msg);
446   }
447
448 }
449   
450
451
452
453 void ConvComlibManager::printDiagnostics(){
454
455
456   //  CkVec<StrategyTableEntry> strategyTable
457   int ready = 0;
458   int tmplistTotal = 0;
459
460   int size = strategyTable.size();
461   //  CkPrintf("[%d]   converse level strategyTable.size()=%d\n", CkMyPe(), size);
462   for(int i=0;i<size;i++){
463     if(strategyTable[i].isReady){
464       ready++;
465       //  CkPrintf("[%d]   strategyTable[%d] is ready\n", CkMyPe(), i);
466     } else {
467       // CkPrintf("[%d]   strategyTable[%d] is not ready\n", CkMyPe(), i);
468     }
469     
470     int nmsg = strategyTable[i].tmplist.length();
471     
472     tmplistTotal += nmsg;
473     //      CkPrintf("[%d]   strategyTable[%d] has %d messages in tmplist\n", CkMyPe(), i, nmsg);
474   }
475
476   if(tmplistTotal>0){
477     CkPrintf("[%d]  %d of %d converse strategies are ready (%d msgs buffered)\n", CkMyPe(), ready, size, tmplistTotal);
478   }
479 }
480
481
482
483
484
485 /*@}*/