The new version of comlib! This version passed "make test" in charm/tests on order...
[charm.git] / src / conv-com / routerstrategy.C
1 /**
2    @addtogroup ConvComlibRouter
3    @{
4    @file
5 */
6
7
8 #include "routerstrategy.h"
9
10 #include "gridrouter.h"
11 #include "graphrouter.h"
12 #include "hypercuberouter.h"
13 #include "treerouter.h"
14 #include "3dgridrouter.h"
15 #include "prefixrouter.h"
16 #include "converse.h"
17 #include "charm++.h"
18
19 //Handlers that call the entry funtions of routers 
20 //Refer to router.h for details on these entry functions
21
22 CkpvDeclare(int, RouterProcHandle);
23 ///Correspods to Router::ProcManyMsg
24 void routerProcManyCombinedMsg(char *msg) {
25     //comID id;
26     int instance_id;
27
28     ComlibPrintf("In Proc combined message at %d\n", CkMyPe());
29     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
30
31     //Comid specific -- to change to a better reading!
32     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
33            , sizeof(int));
34
35     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(instance_id);
36     s->getRouter()->ProcManyMsg(s->getComID(), msg);
37 }
38
39 CkpvDeclare(int, RouterDummyHandle);
40 ///Correspods to Router::DummyEP
41 void routerDummyMsg(DummyMsg *m) {
42     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(m->id.instanceID);
43     s->getRouter()->DummyEP(m->id, m->magic);
44 }
45
46 CkpvDeclare(int, RouterRecvHandle);
47 ///Correspods to Router::RecvManyMsg
48 void routerRecvManyCombinedMsg(char *msg) {
49     //comID id;
50     int instance_id;
51     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
52     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
53     
54     //Comid specific -- to change to a better reading!
55     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
56            , sizeof(int));
57
58     RouterStrategy *s = (RouterStrategy*)ConvComlibGetStrategy(instance_id);
59     s->getRouter()->RecvManyMsg(s->getComID(), msg);
60 }
61
62 /* DEPRECATED: method notifyDone substitutes it
63 void doneHandler(DummyMsg *m){
64     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
65     
66     ((RouterStrategy *)s)->Done(m);
67 }
68 */
69
70 void RouterStrategy::setReverseMap(){
71     int pcount;
72     for(pcount = 0; pcount < CkNumPes(); pcount++)
73         procMap[pcount] = -1;
74
75     //All processors not in the domain will point to -1
76     for(pcount = 0; pcount < npes; pcount++) {
77         if (pelist[pcount] == CkMyPe())
78             myPe = pcount;
79
80         procMap[pelist[pcount]] = pcount;
81     }
82 }
83
84 RouterStrategy::RouterStrategy(int stratid) {
85   ComlibPrintf("[%d] RouterStrategy protected constructor\n",CkMyPe());
86   setType(CONVERSE_STRATEGY);
87   id.instanceID = 0;
88   id.isAllToAll = 0;
89   id.refno = 0;
90   doneHandle = 0;
91   routerIDsaved = stratid;
92   router = NULL;
93   pelist = NULL;
94   bcast_pemap = NULL;
95   procMap = new int[CkNumPes()];
96   doneFlag = 1;
97   bufferedDoneInserting = 0;
98 }
99
100 RouterStrategy::RouterStrategy(int stratid, int handle, int _nsrc, int *_srclist,
101                                int _ndest, int *_destlist) : Strategy() {
102
103   ComlibPrintf("[%d] RouterStrategy constructor\n",CkMyPe());
104
105     setType(CONVERSE_STRATEGY);
106
107     //CkpvInitialize(int, RecvHandle);
108     //CkpvInitialize(int, ProcHandle);
109     //CkpvInitialize(int, DummyHandle);
110
111     id.instanceID = 0; //Set later in doneInserting
112     
113     id.isAllToAll = 0;
114     id.refno = 0;
115
116     /*
117     CkpvAccess(RecvHandle) =
118         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
119     CkpvAccess(ProcHandle) =
120         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
121     CkpvAccess(DummyHandle) = 
122         CkRegisterHandler((CmiHandler)dummyEP);    
123     */
124
125     //myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
126
127     // Iteration done handle
128     doneHandle = handle;
129
130     routerIDsaved = stratid;
131
132     router = NULL;
133     pelist = NULL;
134     bcast_pemap = NULL;
135     procMap = new int[CkNumPes()];    
136
137     //Start with all iterations done
138     doneFlag = 1;
139     
140     //No Buffered doneInserting at the begining
141     bufferedDoneInserting = 0;
142
143     newKnowledgeSrc = _srclist;
144     newKnowledgeSrcSize = _nsrc;
145     if (_ndest == 0) {
146       newKnowledgeDest = new int[_nsrc];
147       newKnowledge = new int[_nsrc];
148       newKnowledgeDestSize = _nsrc;
149       newKnowledgeSize = _nsrc;
150       memcpy(newKnowledgeDest, _srclist, _nsrc);
151       memcpy(newKnowledge, _srclist, _nsrc);
152     } else {
153       newKnowledgeDest = _destlist;
154       newKnowledgeDestSize = _ndest;
155       int *tmplist = new int[CkNumPes()];
156       for (int i=0; i<CkNumPes(); ++i) tmplist[i]=0;
157       for (int i=0; i<_nsrc; ++i) tmplist[newKnowledgeSrc[i]]++;
158       for (int i=0; i<_ndest; ++i) tmplist[newKnowledgeDest[i]]++;
159       newKnowledgeSize = 0;
160       for (int i=0; i<CkNumPes(); ++i) if (tmplist[i]!=0) newKnowledgeSize++;
161       newKnowledge = new int[newKnowledgeSize];
162       for (int i=0, count=0; i<CkNumPes(); ++i) if (tmplist[i]!=0) newKnowledge[count++]=i;
163       delete [] tmplist;
164     }
165
166     setupRouter();
167     /*
168     npes = _npes;
169     //pelist = new int[npes];
170     pelist = _pelist;
171     //memcpy(pelist, _pelist, sizeof(int) * npes);    
172
173     if(npes <= 1)
174         routerID = USE_DIRECT;
175
176     myPe = -1;
177     setReverseMap();
178
179     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
180                  myPe, npes);
181
182     if(myPe < 0) {
183         //I am not part of this strategy
184         router = NULL;
185         routerID = USE_DIRECT;
186         return;        
187     }
188
189     switch(stratid) {
190     case USE_TREE: 
191         router = new TreeRouter(npes, myPe);
192         break;
193         
194     case USE_MESH:
195         router = new GridRouter(npes, myPe);
196         break;
197         
198     case USE_HYPERCUBE:
199         router = new HypercubeRouter(npes, myPe);
200         break;
201         
202     case USE_GRID:
203         router = new D3GridRouter(npes, myPe);
204         break;
205         
206     case USE_PREFIX:
207         router = new PrefixRouter(npes, myPe);
208         break;
209
210     case USE_DIRECT: router = NULL;
211         break;
212         
213     default: CmiAbort("Unknown Strategy\n");
214         break;
215     }
216
217     if(router) {
218         router->SetMap(pelist);
219         //router->setDoneHandle(myDoneHandle);
220         //router->SetID(id);
221     }
222     */
223 }
224
225 void RouterStrategy::setupRouter() {
226     if (bcast_pemap != NULL && ndestPes != newKnowledgeDestSize) {
227       delete[] bcast_pemap;
228       bcast_pemap = NULL;
229     }
230
231     npes = newKnowledgeSize;
232     nsrcPes = newKnowledgeSrcSize;
233     ndestPes = newKnowledgeDestSize;
234     if (pelist != NULL) {
235       delete[] pelist;
236       delete[] srcPelist;
237       delete[] destPelist;
238     }
239     pelist = newKnowledge;
240     srcPelist = newKnowledgeSrc;
241     destPelist = newKnowledgeDest;
242
243     newKnowledge = NULL;
244
245     if (npes <= 1) routerID = USE_DIRECT;
246     else routerID = routerIDsaved;
247
248     myPe = -1;
249     setReverseMap();
250
251     ComlibPrintf("[%d] Router Strategy : %d, MYPE = %d, NUMPES = %d \n", CkMyPe(),  routerID, myPe, npes);
252
253     ComlibPrintf("[%d] router=%p\n", CkMyPe(),  router);
254
255         delete router;
256     router = NULL;
257
258     if (myPe < 0) {
259         //I am not part of this strategy
260         router = NULL;
261         routerID = USE_DIRECT;
262         return;        
263     }
264
265     switch(routerID) {
266     case USE_TREE: 
267         router = new TreeRouter(npes, myPe, this);
268         break;
269         
270     case USE_MESH:
271         router = new GridRouter(npes, myPe, this);
272         break;
273         
274     case USE_HYPERCUBE:
275         router = new HypercubeRouter(npes, myPe, this);
276         break;
277         
278     case USE_GRID:
279         router = new D3GridRouter(npes, myPe, this);
280         break;
281         
282     case USE_PREFIX:
283         router = new PrefixRouter(npes, myPe, this);
284         break;
285
286     case USE_DIRECT: router = NULL;
287         break;
288         
289     default: CmiAbort("Unknown Strategy\n");
290         break;
291     }
292
293     if(router) {
294         router->SetMap(pelist);
295         //router->setDoneHandle(myDoneHandle);
296         //router->SetID(id);
297     }
298 }
299
300 RouterStrategy::~RouterStrategy() {
301   ComlibPrintf("[%d] RouterStrategy destructor\n",CkMyPe());
302
303     delete [] pelist;
304     delete [] srcPelist;
305     delete [] destPelist;
306
307     if(bcast_pemap != NULL) delete [] bcast_pemap;
308     
309     delete [] procMap;
310
311     delete router;
312     router = NULL;
313 }
314
315 /// Receive a message from the upper layer and buffer it in the msgQ until
316 /// doneInserting is called. If the strategy is USE_DIRECT then just send it.
317 void RouterStrategy::insertMessage(MessageHolder *cmsg){
318         ComlibPrintf("[%d] RouterStrategy::insertMessage\n", CkMyPe());
319
320         
321         for(int i = 0; i < ndestPes; i++){
322                 int destPe = destPelist[i];
323                 ComlibPrintf("[%d] RouterStrategy::insertMessage destPelist[%d]=%d\n", CkMyPe(), i, destPe );
324         }
325         
326   //if(myPe < 0)
327   //    CmiAbort("insertMessage: mype < 0\n");
328
329     int count = 0;
330     if(routerID == USE_DIRECT) {
331 #if 0
332         // THE OLD VERSION. THIS IS BAD, as it can cause messages to be lost before errors are detected.
333         if(cmsg->dest_proc == IS_BROADCAST) {
334           // ndestPes = npes;
335                 for(count = 0; count < ndestPes-1; count ++){
336                         CmiSyncSend(destPelist[count], cmsg->size, cmsg->getMessage());
337                         int destPe = destPelist[count];
338                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to PE %d\n", CkMyPe(), destPe );
339                 }
340                 if(ndestPes > 0){
341                         CmiSyncSendAndFree(destPelist[ndestPes-1], cmsg->size, cmsg->getMessage());
342                         int destPe = destPelist[ndestPes-1];
343                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to PE %d\n", CkMyPe(), destPe);
344                 }
345         }
346         else
347             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
348                                cmsg->getMessage());
349         delete cmsg;
350 #else
351         if(cmsg->dest_proc == IS_BROADCAST) {
352                 ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all PEs\n", CkMyPe());
353                 for(int destPe = 0; destPe < CkNumPes()-1; destPe++){
354                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), destPe );
355                         CmiSyncSend(destPe, cmsg->size, cmsg->getMessage());
356                 }
357                 if(CkNumPes()>0){
358                         CmiSyncSendAndFree(CkNumPes()-1, cmsg->size, cmsg->getMessage());
359                         ComlibPrintf("[%d] RouterStrategy::insertMessage Broadcasting to all, PE %d\n", CkMyPe(), CkNumPes()-1 );
360                 }
361         }       
362         else
363             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
364                                cmsg->getMessage());
365         delete cmsg;
366     
367 #endif
368     }
369     else {
370         if(cmsg->dest_proc >= 0) {
371             cmsg->pelist = &procMap[cmsg->dest_proc];
372             cmsg->npes = 1;
373         }
374         else if (cmsg->dest_proc == IS_BROADCAST){
375
376           // if we are calling a broadcast then we set AllToAll flag
377           id.isAllToAll = 1;
378
379             if(bcast_pemap == NULL) {
380                 bcast_pemap = new int[ndestPes];
381                 for(count = 0; count < ndestPes; count ++) {
382                     bcast_pemap[count] = count;
383                 }
384             }
385
386             cmsg->pelist = bcast_pemap;
387             cmsg->npes = npes;
388         }
389         
390         msgQ.push(cmsg);
391     }
392 }
393
394 void RouterStrategy::doneInserting(){
395         ComlibPrintf("[%d] RouterStrategy::doneInserting", CkMyPe());
396         
397   if(myPe < 0) return; // nothing to do if I have not objects in my processor
398       //CmiAbort("insertMessage: mype < 0\n");
399
400     id.instanceID = getInstance();
401
402     //ComlibPrintf("Instance ID = %d\n", getInstance());
403     ComlibPrintf("[%d] RouterStrategy::doneInserting %d \n", CkMyPe(), msgQ.length());
404     
405     if(doneFlag == 0) {
406         ComlibPrintf("[%d] Waiting for previous iteration to Finish\n", 
407                      CkMyPe());
408         bufferedDoneInserting = 1;
409         ComlibPrintf("[%d] RouterStrategy::doneInserting returning\n", CkMyPe());
410         return;
411     }
412     
413     if(routerID == USE_DIRECT) {
414       //DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
415       //memset((char *)m, 0, sizeof(DummyMsg)); 
416       //m->id.instanceID = getInstance();
417       ComlibPrintf("[%d] RouterStrategy::doneInserting calling notifyDone()\n", CkMyPe());
418       notifyDone();
419       return;
420     }
421
422     doneFlag = 0;
423     bufferedDoneInserting = 0;
424
425     id.refno ++;
426
427     if(msgQ.length() == 0) {
428         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
429         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
430         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
431         
432         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
433                                                      myPe, 
434                                                      sizeof(DummyMsg));
435         cmsg->isDummy = 1;
436         cmsg->pelist = &myPe;
437         cmsg->npes = 1;
438         msgQ.push(cmsg);
439     }
440
441     router->EachToManyMulticastQ(id, msgQ);
442
443     /* These were needed in the case of more than one iteration executing at the same time.
444        For the moment this is not the case.
445     while(!recvQ.isEmpty()) {
446         char *msg = recvQ.deq();
447         RecvManyMsg(msg);
448     }
449
450     while(!procQ.isEmpty()) {
451         char *msg = procQ.deq();
452         ProcManyMsg(msg);
453     }
454
455     while(!dummyQ.isEmpty() > 0) {
456         DummyMsg *m = dummyQ.deq();
457         router->DummyEP(m->id, m->magic);
458         CmiFree(m);
459     }
460     */
461 }
462
463 void RouterStrategy::deliver(char *msg, int size) {
464   CmiSyncSendAndFree(CkMyPe(), size, msg);
465 }
466
467 /// Update the router accordingly to the new information. If the router is
468 /// currently active (doneFlag==0), then wait for it to finish and store the
469 /// knowledge in "newKnowledge"
470 void RouterStrategy::bracketedUpdatePeKnowledge(int *count) {
471   ComlibPrintf("[%d] RouterStrategy: Updating knowledge\n", CkMyPe());
472
473   newKnowledgeSize = 0;
474   newKnowledgeSrcSize = 0;
475   newKnowledgeDestSize = 0;
476   for (int i=0; i<CkNumPes(); ++i) {
477     if (count[i] != 0) newKnowledgeSize++;
478     if ((count[i]&2) == 2) newKnowledgeDestSize++;
479     if ((count[i]&1) == 1) newKnowledgeSrcSize++;
480   }
481   newKnowledge = new int[newKnowledgeSize];
482   newKnowledgeSrc = new int[newKnowledgeSrcSize];
483   newKnowledgeDest = new int[newKnowledgeDestSize];
484   
485   for(int i=0;i<newKnowledgeDestSize;i++){
486           newKnowledgeDest[i] = -1;
487   }
488
489   for(int i=0;i<newKnowledgeSrcSize;i++){
490           newKnowledgeSrc[i] = -1;
491   }
492
493   for(int i=0;i<newKnowledgeSize;i++){
494           newKnowledge[i] = -1;
495   }
496
497   int c=0, cS=0, cD=0;
498   for (int i=0; i<CkNumPes(); ++i) {
499     if (count[i] != 0) newKnowledge[c++]=i;
500     if ((count[i]&2) == 2) newKnowledgeDest[cD++]=i;
501     if ((count[i]&1) == 1) newKnowledgeSrc[cS++]=i;
502   }
503   
504   ComlibPrintf("[%d] RouterStrategy::bracketedUpdatePeKnowledge c=%d cS=%d cD=%d\n", CkMyPe(), c, cS, cD);
505
506   for (int i=0; i<newKnowledgeDestSize; ++i) {
507     ComlibPrintf("[%d] RouterStrategy::bracketedUpdatePeKnowledge newKnowledgeDest[%d]=%d\n", CkMyPe(), i, newKnowledgeDest[i]);
508   }
509
510   
511   
512   if (doneFlag == 0) return;
513
514   // here we can update the knowledge
515   setupRouter();
516 }
517
518 void RouterStrategy::notifyDone(){
519
520     ComlibPrintf("[%d] RouterStrategy: Finished iteration\n", CkMyPe());
521
522     if(doneHandle > 0) {
523       DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
524       memset((char *)m, 0, sizeof(DummyMsg)); 
525       m->id.instanceID = getInstance();
526       CmiSetHandler(m, doneHandle);
527       CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
528     }
529
530     doneFlag = 1;
531     // at this point, if we have some knowledge update, we apply it
532     if (newKnowledge != NULL) {
533       //bracketedUpdatePeKnowledge(newKnowledge);
534       setupRouter();
535     }
536
537     if(bufferedDoneInserting) doneInserting();
538 }
539
540 void RouterStrategy::pup(PUP::er &p) {
541   ComlibPrintf("[%d] RouterStrategy::pup called for %s\n",CkMyPe(),
542                p.isPacking()?"packing":(p.isUnpacking()?"unpacking":"sizing"));
543   Strategy::pup(p);
544
545   p | id;
546   if (p.isUnpacking()) {
547     pelist = NULL;
548     bcast_pemap = NULL;
549     procMap = new int[CkNumPes()];
550     doneFlag = 1;
551     bufferedDoneInserting = 0;
552   }
553
554   p | npes;
555   p | nsrcPes;
556   p | ndestPes;
557   newKnowledgeSize = npes;
558   newKnowledgeSrcSize = nsrcPes;
559   newKnowledgeDestSize = ndestPes;
560
561   if (p.isUnpacking()) {
562     newKnowledge = new int[npes];
563     newKnowledgeSrc = new int[nsrcPes];
564     newKnowledgeDest = new int[ndestPes];
565   } else {
566     newKnowledge = pelist;
567     newKnowledgeSrc = srcPelist;
568     newKnowledgeDest = destPelist;
569   }
570   p(newKnowledge, npes);
571   p(newKnowledgeSrc, nsrcPes);
572   p(newKnowledgeDest, ndestPes);
573
574   p | routerIDsaved;
575   p | doneHandle;
576
577   if (p.isUnpacking()) {
578           // Because we are unpacking, the router strategy should be initialized to NULL, so that setupRouter will correctly instantiate it
579           router = NULL;
580           setupRouter();
581   }
582   else newKnowledge = NULL;
583 }
584
585 PUPable_def(RouterStrategy);
586
587 /*@}*/