Fixing some comlib bugs that could possibly occur due to the way the src and dest...
[charm.git] / src / conv-com / routerstrategy.C
1 /**
2    @addtogroup ConvComlibRouter
3    @{
4    @file
5 */
6
7
8 #include "routerstrategy.h"
9
10 #include "gridrouter.h"
11 #include "graphrouter.h"
12 #include "hypercuberouter.h"
13 #include "treerouter.h"
14 #include "3dgridrouter.h"
15 #include "prefixrouter.h"
16 #include "converse.h"
17 #include "charm++.h"
18
19 //Handlers that call the entry funtions of routers 
20 //Refer to router.h for details on these entry functions
21
22 CkpvDeclare(int, RouterProcHandle);
23 ///Correspods to Router::ProcManyMsg
24 void routerProcManyCombinedMsg(char *msg) {
25     //comID id;
26     int instance_id;
27
28     ComlibPrintf("In Proc combined message at %d\n", CkMyPe());
29     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
30
31     //Comid specific -- to change to a better reading!
32     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
33            , sizeof(int));
34
35     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(instance_id);
36     s->getRouter()->ProcManyMsg(s->getComID(), msg);
37 }
38
39 CkpvDeclare(int, RouterDummyHandle);
40 ///Correspods to Router::DummyEP
41 void routerDummyMsg(DummyMsg *m) {
42     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(m->id.instanceID);
43     s->getRouter()->DummyEP(m->id, m->magic);
44 }
45
46 CkpvDeclare(int, RouterRecvHandle);
47 ///Correspods to Router::RecvManyMsg
48 void routerRecvManyCombinedMsg(char *msg) {
49     //comID id;
50     int instance_id;
51     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
52     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
53     
54     //Comid specific -- to change to a better reading!
55     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
56            , sizeof(int));
57
58     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(instance_id);
59     s->getRouter()->RecvManyMsg(s->getComID(), msg);
60 }
61
62 /* DEPRECATED: method notifyDone substitutes it
63 void doneHandler(DummyMsg *m){
64     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
65     
66     ((RouterStrategy *)s)->Done(m);
67 }
68 */
69
70 void RouterStrategy::setReverseMap(){
71     int pcount;
72     for(pcount = 0; pcount < CkNumPes(); pcount++)
73         procMap[pcount] = -1;
74
75     //All processors not in the domain will point to -1
76     for(pcount = 0; pcount < npes; pcount++) {
77         if (pelist[pcount] == CkMyPe())
78             myPe = pcount;
79
80         procMap[pelist[pcount]] = pcount;
81     }
82 }
83
84 RouterStrategy::RouterStrategy(int stratid) {
85   ComlibPrintf("[%d] RouterStrategy protected constructor\n",CkMyPe());
86   setType(CONVERSE_STRATEGY);
87   id.instanceID = 0;
88   id.isAllToAll = 0;
89   id.refno = 0;
90   doneHandle = 0;
91   routerIDsaved = stratid;
92   router = NULL;
93   pelist = NULL;
94   bcast_pemap = NULL;
95   procMap = new int[CkNumPes()];
96   doneFlag = 1;
97   bufferedDoneInserting = 0;
98 }
99
100 RouterStrategy::RouterStrategy(int stratid, int handle, int _nsrc, int *_srclist,
101                                int _ndest, int *_destlist) : Strategy() {
102
103   ComlibPrintf("[%d] RouterStrategy constructor\n",CkMyPe());
104
105     setType(CONVERSE_STRATEGY);
106
107     //CkpvInitialize(int, RecvHandle);
108     //CkpvInitialize(int, ProcHandle);
109     //CkpvInitialize(int, DummyHandle);
110
111     id.instanceID = 0; //Set later in doneInserting
112     
113     id.isAllToAll = 0;
114     id.refno = 0;
115
116     /*
117     CkpvAccess(RecvHandle) =
118         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
119     CkpvAccess(ProcHandle) =
120         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
121     CkpvAccess(DummyHandle) = 
122         CkRegisterHandler((CmiHandler)dummyEP);    
123     */
124
125     //myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
126
127     // Iteration done handle
128     doneHandle = handle;
129
130     routerIDsaved = stratid;
131
132     router = NULL;
133     pelist = NULL;
134     bcast_pemap = NULL;
135     procMap = new int[CkNumPes()];    
136
137     //Start with all iterations done
138     doneFlag = 1;
139     
140     //No Buffered doneInserting at the begining
141     bufferedDoneInserting = 0;
142
143     newKnowledgeSrc = _srclist;
144     newKnowledgeSrcSize = _nsrc;
145     if (_ndest == 0) {
146       newKnowledgeDest = new int[_nsrc];
147       newKnowledge = new int[_nsrc];
148       newKnowledgeDestSize = _nsrc;
149       newKnowledgeSize = _nsrc;
150       memcpy(newKnowledgeDest, _srclist, _nsrc);
151       memcpy(newKnowledge, _srclist, _nsrc);
152     } else {
153       newKnowledgeDest = _destlist;
154       newKnowledgeDestSize = _ndest;
155       int *tmplist = new int[CkNumPes()];
156       for (int i=0; i<CkNumPes(); ++i) tmplist[i]=0;
157       for (int i=0; i<_nsrc; ++i) tmplist[newKnowledgeSrc[i]]++;
158       for (int i=0; i<_ndest; ++i) tmplist[newKnowledgeDest[i]]++;
159       newKnowledgeSize = 0;
160       for (int i=0; i<CkNumPes(); ++i) if (tmplist[i]!=0) newKnowledgeSize++;
161       newKnowledge = new int[newKnowledgeSize];
162       for (int i=0, count=0; i<CkNumPes(); ++i) if (tmplist[i]!=0) newKnowledge[count++]=i;
163       delete [] tmplist;
164     }
165
166     setupRouter();
167     /*
168     npes = _npes;
169     //pelist = new int[npes];
170     pelist = _pelist;
171     //memcpy(pelist, _pelist, sizeof(int) * npes);    
172
173     if(npes <= 1)
174         routerID = USE_DIRECT;
175
176     myPe = -1;
177     setReverseMap();
178
179     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
180                  myPe, npes);
181
182     if(myPe < 0) {
183         //I am not part of this strategy
184         router = NULL;
185         routerID = USE_DIRECT;
186         return;        
187     }
188
189     switch(stratid) {
190     case USE_TREE: 
191         router = new TreeRouter(npes, myPe);
192         break;
193         
194     case USE_MESH:
195         router = new GridRouter(npes, myPe);
196         break;
197         
198     case USE_HYPERCUBE:
199         router = new HypercubeRouter(npes, myPe);
200         break;
201         
202     case USE_GRID:
203         router = new D3GridRouter(npes, myPe);
204         break;
205         
206     case USE_PREFIX:
207         router = new PrefixRouter(npes, myPe);
208         break;
209
210     case USE_DIRECT: router = NULL;
211         break;
212         
213     default: CmiAbort("Unknown Strategy\n");
214         break;
215     }
216
217     if(router) {
218         router->SetMap(pelist);
219         //router->setDoneHandle(myDoneHandle);
220         //router->SetID(id);
221     }
222     */
223 }
224
225 void RouterStrategy::setupRouter() {
226     if (bcast_pemap != NULL && ndestPes != newKnowledgeDestSize) {
227       delete[] bcast_pemap;
228       bcast_pemap = NULL;
229     }
230
231     npes = newKnowledgeSize;
232     nsrcPes = newKnowledgeSrcSize;
233     ndestPes = newKnowledgeDestSize;
234     if (pelist != NULL) {
235       delete[] pelist;
236       delete[] srcPelist;
237       delete[] destPelist;
238     }
239     pelist = newKnowledge;
240     srcPelist = newKnowledgeSrc;
241     destPelist = newKnowledgeDest;
242
243     newKnowledge = NULL;
244
245     if (npes <= 1) routerID = USE_DIRECT;
246     else routerID = routerIDsaved;
247
248     myPe = -1;
249     setReverseMap();
250
251     ComlibPrintf("[%d] Router Strategy : %d, MYPE = %d, NUMPES = %d \n", CkMyPe(),  routerID, myPe, npes);
252
253     ComlibPrintf("[%d] router=%p\n", CkMyPe(),  router);
254
255         delete router;
256     router = NULL;
257
258     if (myPe < 0) {
259         //I am not part of this strategy
260         router = NULL;
261         routerID = USE_DIRECT;
262         return;        
263     }
264
265     switch(routerID) {
266     case USE_TREE: 
267         router = new TreeRouter(npes, myPe, this);
268         break;
269         
270     case USE_MESH:
271         router = new GridRouter(npes, myPe, this);
272         break;
273         
274     case USE_HYPERCUBE:
275         router = new HypercubeRouter(npes, myPe, this);
276         break;
277         
278     case USE_GRID:
279         router = new D3GridRouter(npes, myPe, this);
280         break;
281         
282     case USE_PREFIX:
283         router = new PrefixRouter(npes, myPe, this);
284         break;
285
286     case USE_DIRECT: router = NULL;
287         break;
288         
289     default: CmiAbort("Unknown Strategy\n");
290         break;
291     }
292
293     if(router) {
294         router->SetMap(pelist);
295         //router->setDoneHandle(myDoneHandle);
296         //router->SetID(id);
297     }
298 }
299
300 RouterStrategy::~RouterStrategy() {
301   ComlibPrintf("[%d] RouterStrategy destructor\n",CkMyPe());
302
303     delete [] pelist;
304     delete [] srcPelist;
305     delete [] destPelist;
306
307     if(bcast_pemap != NULL) delete [] bcast_pemap;
308     
309     delete [] procMap;
310
311     delete router;
312     router = NULL;
313 }
314
315 /// Receive a message from the upper layer and buffer it in the msgQ until
316 /// doneInserting is called. If the strategy is USE_DIRECT then just send it to the handleMessage method for the Strategy.
317 void RouterStrategy::insertMessage(MessageHolder *cmsg){
318         ComlibPrintf("[%d] RouterStrategy::insertMessage\n", CkMyPe());
319
320        
321   //if(myPe < 0)
322   //    CmiAbort("insertMessage: mype < 0\n");
323
324     int count = 0;
325     if(routerID == USE_DIRECT) {
326 #if 0
327         // THE OLD VERSION. THIS IS BAD, as it can cause messages to be lost before errors are detected.
328         if(cmsg->dest_proc == IS_BROADCAST) {
329           // ndestPes = npes;
330                 for(count = 0; count < ndestPes-1; count ++){
331                         CmiSyncSend(destPelist[count], cmsg->size, cmsg->getMessage());
332                         int destPe = destPelist[count];
333                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to PE %d\n", CkMyPe(), destPe );
334                 }
335                 if(ndestPes > 0){
336                         CmiSyncSendAndFree(destPelist[ndestPes-1], cmsg->size, cmsg->getMessage());
337                         int destPe = destPelist[ndestPes-1];
338                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to PE %d\n", CkMyPe(), destPe);
339                 }
340         }
341         else
342             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
343                                cmsg->getMessage());
344         delete cmsg;
345 #else
346         if(cmsg->dest_proc == IS_BROADCAST) {
347                 ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all PEs\n", CkMyPe());
348         
349 #if 0
350                 CmiSyncBroadcastAndFree(cmsg->size, cmsg->getMessage() ); // This ought to be the same as the following alternative
351 #else
352                 for(int destPe = 0; destPe < CkNumPes()-1; destPe++){
353                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), destPe );
354                         CmiSyncSend(destPe, cmsg->size, cmsg->getMessage());
355                 }
356                 if(CkNumPes()>0){
357                         CmiSyncSendAndFree(CkNumPes()-1, cmsg->size, cmsg->getMessage());
358                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), CkNumPes()-1 );
359                 }
360 #endif
361
362         }       
363         else {
364           CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, cmsg->getMessage());
365         }
366         delete cmsg;
367     
368 #endif
369     }
370     else {
371         if(cmsg->dest_proc >= 0) {
372             cmsg->pelist = &procMap[cmsg->dest_proc];
373             cmsg->npes = 1;
374         }
375         else if (cmsg->dest_proc == IS_BROADCAST){
376
377           // if we are calling a broadcast then we set AllToAll flag
378           id.isAllToAll = 1;
379
380             if(bcast_pemap == NULL) {
381                 bcast_pemap = new int[ndestPes];
382                 for(count = 0; count < ndestPes; count ++) {
383                     bcast_pemap[count] = count;
384                 }
385             }
386
387             cmsg->pelist = bcast_pemap;
388             cmsg->npes = npes;
389         }
390         
391         msgQ.push(cmsg);
392     }
393 }
394
395 void RouterStrategy::doneInserting(){
396   ComlibPrintf("[%d] RouterStrategy::doneInserting msgQ.length()=%d \n", CkMyPe(), msgQ.length());
397   
398   
399   if(myPe < 0) return; // nothing to do if I have not objects in my processor
400       //CmiAbort("insertMessage: mype < 0\n");
401
402     id.instanceID = getInstance();
403
404     //ComlibPrintf("Instance ID = %d\n", getInstance());
405     
406     if(doneFlag == 0) {
407         ComlibPrintf("[%d] Waiting for previous iteration to Finish\n", 
408                      CkMyPe());
409         bufferedDoneInserting = 1;
410         ComlibPrintf("[%d] RouterStrategy::doneInserting returning\n", CkMyPe());
411         return;
412     }
413     
414     if(routerID == USE_DIRECT) {
415       CkAssert(msgQ.length() == 0);
416       //DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
417       //memset((char *)m, 0, sizeof(DummyMsg)); 
418       //m->id.instanceID = getInstance();
419       ComlibPrintf("[%d] RouterStrategy::doneInserting calling notifyDone()\n", CkMyPe());
420       notifyDone();
421       return;
422     }
423
424     doneFlag = 0;
425     bufferedDoneInserting = 0;
426
427     id.refno ++;
428
429     if(msgQ.length() == 0) {
430         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
431         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
432         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
433         
434         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
435                                                      myPe, 
436                                                      sizeof(DummyMsg));
437         cmsg->isDummy = 1;
438         cmsg->pelist = &myPe;
439         cmsg->npes = 1;
440         msgQ.push(cmsg);
441     }
442
443     ComlibPrintf("Calling router->EachToManyMulticastQ??????????????????????????\n");
444     router->EachToManyMulticastQ(id, msgQ);
445
446 }
447
448 void RouterStrategy::deliver(char *msg, int size) {
449   CmiSyncSendAndFree(CkMyPe(), size, msg);
450 }
451
452 /// Update the router accordingly to the new information. If the router is
453 /// currently active (doneFlag==0), then wait for it to finish and store the
454 /// knowledge in "newKnowledge"
455 void RouterStrategy::bracketedUpdatePeKnowledge(int *count) {
456   ComlibPrintf("[%d] RouterStrategy: Updating knowledge\n", CkMyPe());
457
458   newKnowledgeSize = 0;
459   newKnowledgeSrcSize = 0;
460   newKnowledgeDestSize = 0;
461   for (int i=0; i<CkNumPes(); ++i) {
462     if (count[i] != 0) newKnowledgeSize++;
463     if ((count[i]&2) == 2) newKnowledgeDestSize++;
464     if ((count[i]&1) == 1) newKnowledgeSrcSize++;
465   }
466   newKnowledge = new int[newKnowledgeSize];
467   newKnowledgeSrc = new int[newKnowledgeSrcSize];
468   newKnowledgeDest = new int[newKnowledgeDestSize];
469   
470   for(int i=0;i<newKnowledgeDestSize;i++){
471           newKnowledgeDest[i] = -1;
472   }
473
474   for(int i=0;i<newKnowledgeSrcSize;i++){
475           newKnowledgeSrc[i] = -1;
476   }
477
478   for(int i=0;i<newKnowledgeSize;i++){
479           newKnowledge[i] = -1;
480   }
481
482   int c=0, cS=0, cD=0;
483   for (int i=0; i<CkNumPes(); ++i) {
484     if (count[i] != 0) newKnowledge[c++]=i;
485     if ((count[i]&2) == 2) newKnowledgeDest[cD++]=i;
486     if ((count[i]&1) == 1) newKnowledgeSrc[cS++]=i;
487   }
488   
489   ComlibPrintf("[%d] RouterStrategy::bracketedUpdatePeKnowledge c=%d cS=%d cD=%d\n", CkMyPe(), c, cS, cD);
490
491   for (int i=0; i<newKnowledgeDestSize; ++i) {
492     ComlibPrintf("[%d] RouterStrategy::bracketedUpdatePeKnowledge newKnowledgeDest[%d]=%d\n", CkMyPe(), i, newKnowledgeDest[i]);
493   }
494
495   
496   
497   if (doneFlag == 0) return;
498
499   // here we can update the knowledge
500   setupRouter();
501 }
502
503 void RouterStrategy::notifyDone(){
504
505     ComlibPrintf("[%d] RouterStrategy: Finished iteration\n", CkMyPe());
506
507     if(doneHandle > 0) {
508       DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
509       memset((char *)m, 0, sizeof(DummyMsg)); 
510       m->id.instanceID = getInstance();
511       CmiSetHandler(m, doneHandle);
512       CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
513     }
514
515     doneFlag = 1;
516     // at this point, if we have some knowledge update, we apply it
517     if (newKnowledge != NULL) {
518       //bracketedUpdatePeKnowledge(newKnowledge);
519       setupRouter();
520     }
521
522     if(bufferedDoneInserting) doneInserting();
523 }
524
525 void RouterStrategy::pup(PUP::er &p) {
526   ComlibPrintf("[%d] RouterStrategy::pup called for %s\n",CkMyPe(),
527                p.isPacking()?"packing":(p.isUnpacking()?"unpacking":"sizing"));
528   Strategy::pup(p);
529
530   p | id;
531   if (p.isUnpacking()) {
532     pelist = NULL;
533     bcast_pemap = NULL;
534     procMap = new int[CkNumPes()];
535     doneFlag = 1;
536     bufferedDoneInserting = 0;
537   }
538
539   p | npes;
540   p | nsrcPes;
541   p | ndestPes;
542   newKnowledgeSize = npes;
543   newKnowledgeSrcSize = nsrcPes;
544   newKnowledgeDestSize = ndestPes;
545
546   if (p.isUnpacking()) {
547     newKnowledge = new int[npes];
548     newKnowledgeSrc = new int[nsrcPes];
549     newKnowledgeDest = new int[ndestPes];
550   } else {
551     newKnowledge = pelist;
552     newKnowledgeSrc = srcPelist;
553     newKnowledgeDest = destPelist;
554   }
555   p(newKnowledge, npes);
556   p(newKnowledgeSrc, nsrcPes);
557   p(newKnowledgeDest, ndestPes);
558
559   p | routerIDsaved;
560   p | doneHandle;
561
562   if (p.isUnpacking()) {
563           // Because we are unpacking, the router strategy should be initialized to NULL, so that setupRouter will correctly instantiate it
564           router = NULL;
565           setupRouter();
566   }
567   else newKnowledge = NULL;
568 }
569
570 PUPable_def(RouterStrategy);
571
572 /*@}*/