Moving some comlib startup code to an initproc routine. This code needs to execute...
[charm.git] / src / conv-com / convcomlibmanager.C
1 /**
2    @addtogroup ConvComlib
3    @{
4    @file
5
6    @brief Implementations of convcomlibmanager.h classes. It also defines all
7    the converse handlers to perform the broadcast of the strategies to all the
8    processors (synchronization process). These routines provide this support
9    also to the charm layer.
10
11    @author Sameer Kumar 28/03/04
12    @author Heavily revised, Filippo Gioachin 01/06
13 */
14
15 #include "convcomlibmanager.h"
16 #include "routerstrategy.h"
17 #include "StreamingStrategy.h"
18 #include "MeshStreamingStrategy.h"
19 #include "pipebroadcastconverse.h"
20 #include "converse.h"
21
22 int com_debug=0;
23
24 /// The location in the global table where the converse Comlib manager is located.
25 CkpvDeclare(ConvComlibManager, conv_com_object);
26
27 /***************************************************************************
28  * Handlers section:
29  *
30  * Declarations of all the converse handler IDs used, together with all the
31  * functions they are associated with.
32  ***************************************************************************/
33
34 /** Handler to which send messages inside the comlib framework. To this handler
35     arrive all the messages sent by common strategies without specific needs.
36     Strategies that need particular rerouting of their messages use their own
37     specific handlers. */
38 CkpvDeclare(int, comlib_handler);
39 /// Method invoked upon receipt a message routed through comlib.
40 void *strategyHandler(void *msg) {
41     CmiMsgHeaderBasic *conv_header = (CmiMsgHeaderBasic *) msg;
42     int instid = conv_header->stratid;
43
44 #ifndef CMK_OPTIMIZE
45     // check that the instid is not zero, meaning a possibly uninitialized value
46     if (instid == 0) {
47       CmiAbort("Comlib strategy ID is zero, did you forget to initialize a variable?\n");
48     }
49 #endif
50     
51     // when handling a message always call the lowest level
52     Strategy *strat = ConvComlibGetStrategy(instid);
53     
54     strat->handleMessage(msg);
55     return NULL;
56 }
57
58 /* Strategies synchronization process:
59  *
60  * 1) the strategies are inserted on processor 0 (and possibly in other
61  *    processors with the same order. The strategies are marked as "new"
62  *
63  * 2) when ComlibDoneCreating is called, processor 0 broadcast all the new
64  *    strategies to all the processors, and marks them as "inSync"
65  *
66  * 3) when a processor receives a table it updates its personal table with the
67  *    incoming, it marks all the strategies just arrived as "inSync", and it
68  *    sends an acknowledgement back to processor 0.
69  *
70  * 4) when an acknowledgement is received by processor 0, a counter is
71  *    decremented. When it reaches 0, all the "inSync" strategies are switched
72  *    to status "ready" and they can start working. All the messages in the
73  *    tmplist are delivered. The sync is broadcasted.
74  *
75  * 5) when an acknowledgement is received by a processor other than 0, all the
76  *    "inSync" strategies are switched to "ready" and the messages in tmplist
77  *    are delivered.
78  *
79  * 6) in order to prevent two consecutive table broadcasts to interfere with
80  *    each other, an additional acknowledgement is sent back by each processor
81  *    to processor 0 to allow a new table update to happen.
82  */
83
84 /** Handler to accept the second acknowledgements, after which a new table can
85     be broadcasted. Executed only on processor 0. */
86 CkpvDeclare(int, comlib_ready);
87 /// Method to acknowledge the ready status
88 void *comlibReadyHandler(void *msg) {
89   ComlibPrintf("[%d] Received ready acknowledgement\n",CmiMyPe());
90   CmiAssert(CkpvAccess(conv_com_object).acksReceived > 0);
91   if (--CkpvAccess(conv_com_object).acksReceived == 0) {
92     // ok, we are done. Do we have to broadcast a new table?
93     ComlibPrintf("Strategy table propagation finished\n");
94     CkpvAccess(conv_com_object).busy = CmiFalse;
95     if (CkpvAccess(conv_com_object).doneCreatingScheduled) {
96       CkpvAccess(conv_com_object).doneCreating();
97     }
98   }
99   CmiFree(msg);
100   return NULL;
101 }
102
103 /** Handler to count and accept the acknowledgements of table received. On
104     processor zero this handler counts the number of acks. On all other
105     processors it simply accept it */
106 CkpvDeclare(int, comlib_table_received);
107
108 /// Method invoked upon receipt of an acknowledgement of table received
109 void *comlibTableReceivedHandler(void *msg) {
110   if (CmiMyPe() == 0) {
111         //    CmiPrintf("Num acks to go: %d\n",CkpvAccess(conv_com_object).acksReceived);
112     if (--CkpvAccess(conv_com_object).acksReceived == 0) {
113       CkpvAccess(conv_com_object).tableReady();
114       // reset acksReceived for the second step
115       //CmiPrintf("All acks received, broadcasting message to table_received\n");
116       CkpvAccess(conv_com_object).acksReceived = CmiNumPes() - 1;
117       CmiSyncBroadcastAndFree(CmiReservedHeaderSize, (char*)msg);
118     } else {
119       CmiFree(msg);
120     }
121   } else {
122     CkpvAccess(conv_com_object).tableReady();
123     CmiSetHandler(msg, CkpvAccess(comlib_ready));
124     CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
125   }  
126   return NULL;
127 }
128
129 /** Handler to broadcast all the strategies to all the processors. This is
130     invoked on all processors except processor zero, upon a broadcast message
131     from processor zero (in ComlibDoneInserting). */
132 CkpvDeclare(int, comlib_receive_table);
133
134 /// Method invoked upon receipt of the strategy table
135 void *comlibReceiveTableHandler(void *msg) {
136   // unpack the message into a StrategyWrapper
137   ComlibPrintf("Received new strategy table\n");
138   StrategyWrapper sw;
139   PUP::fromMem pm(((char*)msg)+CmiReservedHeaderSize);
140   pm|sw;
141
142   // insert the strategies into the local table
143   for (int i=0; i<sw.nstrats; ++i) {
144     Strategy *current = CkpvAccess(conv_com_object).getStrategy(sw.position[i]);
145     if (sw.replace[i] && current != NULL) {
146       // delete the old strategy. Since it is requested, it is safe
147       delete current;
148       current = NULL;
149     }
150     if (current == NULL) {
151       // if current is NULL either the strategy has never been set yet, or we
152       // are replacing it
153       CkpvAccess(conv_com_object).setStrategy(sw.position[i], sw.strategy[i]);
154     } else {
155       // let's delete the incoming strategy since it is not used
156       delete sw.strategy[i];
157     }
158     CkpvAccess(conv_com_object).inSync(sw.position[i]);
159   }
160
161   // cheat about the size of the message
162   CmiSetHandler(msg, CkpvAccess(comlib_table_received));
163   CmiSyncSendAndFree(0, CmiReservedHeaderSize, (char*)msg);
164   return NULL;
165 }
166
167 /***************************************************************************
168  * ConvComlibManager section:
169  *
170  * Implementation of the functions defined in the ConvComlibManager class.
171  ***************************************************************************/
172
173 ConvComlibManager::ConvComlibManager(): strategyTable(MAX_NUM_STRATS+1){
174   nstrats = 0;
175   init_flag = CmiFalse;
176   acksReceived = 0;
177   doneCreatingScheduled = CmiFalse;
178   busy = CmiFalse;
179 }
180
181 /** Insert a strategy into the system table, and return a handle to be used to
182     access the strategy later. This handle is the location in the strategy
183     table. From the strategy itself the user can get a handle for its usage. */
184 int ConvComlibManager::insertStrategy(Strategy *s) {
185
186   // This is not right, remove this stupid restriction!
187   if(nstrats >= MAX_NUM_STRATS)
188     CmiAbort("Too Many strategies\n");
189
190   int index = ++nstrats;
191   StrategyTableEntry &st = strategyTable[index];
192   
193   if(st.strategy != NULL) CmiAbort("Trying to insert a strategy over another one!");
194   
195   st.strategy = s;
196   st.isNew = 1;
197   st.bracketedSetupFinished = 2*CkNumPes();
198   
199   //s->setInstance(index);
200   
201   // if the strategy is pure converse or pure charm the following
202   // line is a duplication, but if a charm strategy embed a converse
203   // strategy it is necessary to set the instanceID in both
204   //s->getConverseStrategy()->setInstance(index); DEPRECATED
205
206   return index;
207 }
208
209 void ConvComlibManager::doneCreating() {
210   ComlibPrintf("Called doneCreating\n");
211   if (busy) {
212     // we have to delay the table broadcast because we are in the middle of another one
213     doneCreatingScheduled = CmiTrue;
214     return;
215   }
216   // if we reach here it means we are not busy and we can proceed
217   busy = CmiTrue;
218   acksReceived = CmiNumPes() - 1;
219   int count = 0;
220   for (int i=1; i<=nstrats; ++i) {
221     if (strategyTable[i].isNew) {
222       count++;
223     }
224   }
225
226   if (count > 0) {
227     // create the wrapper and link the strategies there
228     StrategyWrapper sw(count);
229     count = 0;
230     for (int i=1; i<=nstrats; ++i) {
231       if (strategyTable[i].isNew) {
232           sw.position[count] = i;
233           sw.replace[count] = CmiFalse;
234           sw.strategy[count] = strategyTable[i].strategy;
235           count++;
236           CkpvAccess(conv_com_object).inSync(i);
237       }
238     }
239
240     // pup the wrapper into a message
241     PUP::sizer ps;
242     ps|sw;
243     char *msg = (char*)CmiAlloc(ps.size() + CmiReservedHeaderSize);
244     PUP::toMem pm(msg+CmiReservedHeaderSize);
245     //int size = ps.size();
246     //pm|size;
247     pm|sw;
248     //for (int i=CmiReservedHeaderSize; i<CmiReservedHeaderSize+size; ++i) {
249     //  CmiPrintf("%x",((char*)msg)[i]);
250     //}
251     //CmiPrintf("\n");
252     CmiSetHandler(msg, CkpvAccess(comlib_receive_table));
253     CmiSyncBroadcastAndFree(ps.size()+CmiReservedHeaderSize, msg);
254
255     /* NOT USED NOW!
256     // call the finalizeCreation after the strategies has been packed
257     for (int i=0; i<strategyTable.size(); ++i) {
258       if (strategyTable[i].isNew) strategyTable[i].strategy->finalizeCreation();
259     }
260     */
261   } else {
262     busy = CmiFalse;
263   }
264 }
265
266 void ConvComlibManager::tableReady() {
267   for (int i=1; i<strategyTable.size(); ++i) {
268     if (strategyTable[i].isInSync) {
269       ComlibPrintf("[%d] ConvComlibManager::tableReady Enabling strategy %d\n",CmiMyPe(),i);
270       strategyTable[i].isInSync = 0;
271       enableStrategy(i);
272     }
273   }
274 }
275
276 #include "ComlibStrategy.h"
277
278 void ConvComlibManager::enableStrategy(int i) {
279   strategyTable[i].isReady = 1;
280   // deliver all the messages in the tmplist to the strategy
281   MessageHolder *mh;
282   while ((mh=strategyTable[i].tmplist.deq()) != NULL) {
283     CharmMessageHolder*cmh = (CharmMessageHolder*)mh;
284     cmh->checkme();
285     cmh->sec_id = cmh->copy_of_sec_id;
286
287 #if DEBUG_MULTICAST
288     int nelem =cmh->sec_id->_nElems;
289     CkPrintf("[%d] enableStrategy() pushing message into strategy %d using copy of sec_id stored when enqueuing message (message=%p nelem=%d)\n",CmiMyPe(),i, mh, nelem);
290 #endif
291
292     strategyTable[i].strategy->insertMessage(mh);
293     //    ((CharmMessageHolder*)mh)->freeCopyOf_sec_id();
294   }
295   for (int j=0; j<strategyTable[i].call_doneInserting; ++j) {
296     strategyTable[i].strategy->doneInserting();
297   }
298   strategyTable[i].call_doneInserting = 0;
299 }
300
301
302
303 /// Handler for dummy messages...
304 /// @TODO: Find out why we need this stupid empty messages and get rid of them (they are used only in router strategies)
305 CkpvDeclare(int, RecvdummyHandle);
306 //handler for dummy messages
307 void recv_dummy(void *msg){
308     ComlibPrintf("Received Dummy %d\n", CmiMyPe());
309     CmiFree(msg);
310 }
311
312 /// @TODO: hack for PipeBroadcastStrategy to register its handlers, fix it.
313 //extern void propagate_handler(void *);
314 extern void propagate_handler_frag(void *);
315
316
317 /** At startup on each processor, this method is called. 
318     This sets up the converse level comlib strategies.
319
320     This is called before any mainchare main functions.
321  */
322 void initConvComlibManager(){ 
323
324     if(!CkpvInitialized(conv_com_object))
325       CkpvInitialize(ConvComlibManager, conv_com_object);
326     
327     
328     if(CkpvAccess(conv_com_object).getInitialized()) {
329       CmiPrintf("Comlib initialized more than once!\n");
330       return;
331     }
332     
333     CkpvInitialize(int, RecvdummyHandle);
334     CkpvAccess(RecvdummyHandle) = CkRegisterHandler((CmiHandler)recv_dummy);
335
336     CkpvInitialize(int, comlib_receive_table);
337     CkpvAccess(comlib_receive_table) = CkRegisterHandler((CmiHandler)comlibReceiveTableHandler);
338     CkpvInitialize(int, comlib_table_received);
339     CkpvAccess(comlib_table_received) = CkRegisterHandler((CmiHandler)comlibTableReceivedHandler);
340     CkpvInitialize(int, comlib_ready);
341     CkpvAccess(comlib_ready) = CkRegisterHandler((CmiHandler)comlibReadyHandler);
342
343     // init strategy specific variables
344
345     // router strategy
346     CkpvInitialize(int, RouterRecvHandle);
347     CkpvAccess(RouterRecvHandle) = CkRegisterHandler((CmiHandler)routerRecvManyCombinedMsg);
348     CkpvInitialize(int, RouterProcHandle);
349     CkpvAccess(RouterProcHandle) = CkRegisterHandler((CmiHandler)routerProcManyCombinedMsg);
350     CkpvInitialize(int, RouterDummyHandle);
351     CkpvAccess(RouterDummyHandle) = CkRegisterHandler((CmiHandler)routerDummyMsg);    
352
353     // streaming strategy
354     CkpvInitialize(int, streaming_handler_id);
355     CkpvAccess(streaming_handler_id) = CkRegisterHandler(StreamingHandlerFn);
356
357     // mesh streaming strategy
358     CkpvInitialize(int, streaming_column_handler_id);
359     CkpvAccess(streaming_column_handler_id) = CkRegisterHandler(streaming_column_handler);
360
361     // pipelined broadcast
362     CkpvInitialize(int, pipeline_handler);
363     CkpvInitialize(int, pipeline_frag_handler);
364     CkpvAccess(pipeline_handler) = CkRegisterHandler((CmiHandler)PipelineHandler);
365     CkpvAccess(pipeline_frag_handler) = CkRegisterHandler((CmiHandler)PipelineFragmentHandler);
366     
367     // general handler
368     CkpvInitialize(int, comlib_handler);
369     CkpvAccess(comlib_handler) = CkRegisterHandler((CmiHandler) strategyHandler);
370
371     //PUPable_reg(Strategy); ABSTRACT
372     //PUPable_reg(ConvComlibInstanceHandle);
373     if (CmiMyRank() == 0) {
374           PUPable_reg(RouterStrategy);
375       PUPable_reg(StreamingStrategy);
376       PUPable_reg(MeshStreamingStrategy);
377       PUPable_reg(PipeBroadcastConverse);
378       PUPable_reg(MessageHolder);
379     }
380     CkpvAccess(conv_com_object).setInitialized();
381 }
382
383 // #ifdef __cplusplus
384 // extern "C" {
385 // #endif
386 //   void ComlibInit() {initComlibManager();}
387 // #ifdef __cplusplus
388 // }
389 // #endif
390
391
392 /***************************************************************************
393  * User section:
394  *
395  * Implementation of the functions used by the user
396  ***************************************************************************/
397
398 Strategy *ConvComlibGetStrategy(int loc) {
399     //Calling converse strategy lets Charm++ strategies one strategy
400     //table entry but multiple layers of strategies (Charm on top of Converse).
401     return CkpvAccess(conv_com_object).getStrategy(loc);
402 }
403
404 // Why is this here for? Guess it is for the routers...
405 void ConvComlibScheduleDoneInserting(int loc) {
406   CkpvAccess(conv_com_object).getStrategyTable(loc)->call_doneInserting++;
407 }
408
409
410   
411 void ConvComlibManager::insertMessage(MessageHolder* msg, int instid) {
412   ComlibPrintf("[%d] enqueuing message for strategy %d in tmplist\n",CmiMyPe(),instid);
413 #ifndef CMK_OPTIMIZE
414   if (instid == 0) CmiAbort("Trying to send a message through comlib strategy zero, did you forget to initialize zome variable?\n");
415 #endif
416   if (isReady(instid)) {
417     ComlibPrintf("[%d] insertMessage inserting into strategy\n", CmiMyPe());
418     strategyTable[instid].strategy->insertMessage(msg);
419   }
420   else{
421     // In this case, we will enqueue the messages, so they can be delivered 
422     // once the strategy is initialized and ready to be used. This is tricky
423     // because the message contains a pointer to its "CkSectionID *sec_id".
424     // Somewhere this structure is freed before these messages are dequeued.
425     // Thus we try to copy the sec_id by calling saveCopyOf_sec_id() which
426     // performs a shallow copy of sec_id, which appears to be sufficient.
427     // This is only a shallow copy, so some pointers inside may well be 
428     // dead.
429     //
430     // FIXME: This should really be fixed in some other manner, such 
431     // as not letting the sec_id get deleted in the first place. Or the relevant 
432     // sec_id fields could be copied into the CharmMessageHolder object.
433
434 #if DEBUG_MULTICAST
435     int nelem = ((CharmMessageHolder*)msg)->sec_id->_nElems;
436     void * si = ((CharmMessageHolder*)msg)->sec_id;
437     ComlibPrintf("[%d] insertMessage inserting into tmplist with msg=%p si=%p nelem=%d\n", CmiMyPe(), msg, si, nelem);
438 #endif
439
440     ComlibPrintf("[%d] msg=%p\n", CkMyPe(), dynamic_cast<CharmMessageHolder*>(msg));
441     ((CharmMessageHolder*)msg)->saveCopyOf_sec_id();
442     ComlibPrintf("[%d] insertMessage inserting into tmplist  intid=%d\n", CmiMyPe(), instid);
443     strategyTable[instid].tmplist.enq(msg);
444   }
445
446 }
447   
448
449
450
451 void ConvComlibManager::printDiagnostics(){
452
453
454   //  CkVec<StrategyTableEntry> strategyTable
455   int ready = 0;
456   int tmplistTotal = 0;
457
458   int size = strategyTable.size();
459   //  CkPrintf("[%d]   converse level strategyTable.size()=%d\n", CkMyPe(), size);
460   for(int i=0;i<size;i++){
461     if(strategyTable[i].isReady){
462       ready++;
463       //  CkPrintf("[%d]   strategyTable[%d] is ready\n", CkMyPe(), i);
464     } else {
465       // CkPrintf("[%d]   strategyTable[%d] is not ready\n", CkMyPe(), i);
466     }
467     
468     int nmsg = strategyTable[i].tmplist.length();
469     
470     tmplistTotal += nmsg;
471     //      CkPrintf("[%d]   strategyTable[%d] has %d messages in tmplist\n", CkMyPe(), i, nmsg);
472   }
473
474   if(tmplistTotal>0){
475     CkPrintf("[%d]  %d of %d converse strategies are ready (%d msgs buffered)\n", CkMyPe(), ready, size, tmplistTotal);
476   }
477 }
478
479
480
481
482
483 /*@}*/