New version which reduces the number of virtual function calls
[charm.git] / src / conv-com / routerstrategy.C
1
2 #include "routerstrategy.h"
3
4 CkpvDeclare(int, RecvHandle);
5 CkpvDeclare(int, ProcHandle);
6 CkpvDeclare(int, DummyHandle);
7
8 //Handlers that call the entry funtions of routers 
9 //Refer to router.h for details on these entry functions
10
11 //Correspods to Router::ProcManyMsg
12 void procManyCombinedMsg(char *msg)
13 {
14     //comID id;
15     int instance_id;
16
17     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
18     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
19
20     //Comid specific
21     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
22            , sizeof(int));
23
24     Strategy *s = ConvComlibGetStrategy(instance_id);
25     ((RouterStrategy *)s)->ProcManyMsg(msg);
26 }
27
28 //Correspods to Router::DummyEP
29 void dummyEP(DummyMsg *m)
30 {
31     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
32     
33     ((RouterStrategy *)s)->DummyEP(m);
34 }
35
36 //Correspods to Router::RecvManyMsg
37 void recvManyCombinedMsg(char *msg)
38 {
39     //comID id;
40     int instance_id;
41     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
42     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
43     
44     //Comid specific
45     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
46            , sizeof(int));
47
48     Strategy *s = ConvComlibGetStrategy(instance_id);
49     ((RouterStrategy *)s)->RecvManyMsg(msg);
50 }
51
52
53 void doneHandler(DummyMsg *m){
54     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
55     
56     ((RouterStrategy *)s)->Done(m);
57 }
58
59 void RouterStrategy::setReverseMap(){
60     int pcount;
61     for(pcount = 0; pcount < CkNumPes(); pcount++)
62         procMap[pcount] = -1;
63
64     //All processors not in the domain will point to -1
65     for(pcount = 0; pcount < npes; pcount++) {
66         if (pelist[pcount] == CkMyPe())
67             myPe = pcount;
68
69         procMap[pelist[pcount]] = pcount;
70     }
71 }
72
73 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
74                                int *_pelist) 
75     : Strategy(){
76     
77     setType(CONVERSE_STRATEGY);
78
79     CkpvInitialize(int, RecvHandle);
80     CkpvInitialize(int, ProcHandle);
81     CkpvInitialize(int, DummyHandle);
82
83     id.instanceID = 0; //Set later in doneInserting
84     
85     id.isAllToAll = 0;
86     id.refno = 0;
87
88     CkpvAccess(RecvHandle) =
89         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
90     CkpvAccess(ProcHandle) =
91         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
92     CkpvAccess(DummyHandle) = 
93         CkRegisterHandler((CmiHandler)dummyEP);    
94
95     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
96
97     //Array strategy done handle
98     doneHandle = handle;
99
100     routerID = stratid;
101
102     npes = _npes;
103     //pelist = new int[npes];
104     pelist = _pelist;
105     //memcpy(pelist, _pelist, sizeof(int) * npes);    
106
107     procMap = new int[CkNumPes()];    
108     setReverseMap();
109
110     bcast_pemap = NULL;
111
112     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
113                  myPe, npes);
114
115     if(myPe < 0) {
116         //I am not part of this strategy
117
118         doneFlag = 0;
119         router = NULL;
120         bufferedDoneInserting = 0;
121         return;        
122     }
123
124     //Start with all iterations done
125     doneFlag = 1;
126     
127     //No Buffered doneInserting at the begining
128     bufferedDoneInserting = 0;
129
130     switch(stratid) {
131     case USE_TREE: 
132         router = new TreeRouter(npes, myPe);
133         break;
134         
135     case USE_MESH:
136         router = new GridRouter(npes, myPe);
137         break;
138         
139     case USE_HYPERCUBE:
140         router = new DimexRouter(npes, myPe);
141         break;
142         
143     case USE_GRID:
144         router = new D3GridRouter(npes, myPe);
145         break;
146
147     case USE_DIRECT: router = NULL;
148         break;
149         
150     default: CmiAbort("Unknown Strategy\n");
151         break;
152     }
153
154     if(router) {
155         router->SetMap(pelist);
156         router->setDoneHandle(myDoneHandle);
157         //router->SetID(id);
158     }
159 }
160
161
162 RouterStrategy::~RouterStrategy() {
163     //delete [] pelist;
164
165     if(bcast_pemap)
166         delete [] bcast_pemap;
167     
168     delete [] procMap;
169     if(router)
170         delete router;
171 }
172
173
174 void RouterStrategy::insertMessage(MessageHolder *cmsg){
175
176     if(myPe < 0)
177         CmiAbort("insertMessage: mype < 0\n");
178
179     int count = 0;
180     if(routerID == USE_DIRECT) {
181         if(cmsg->dest_proc == IS_BROADCAST) {
182             for(count = 0; count < cmsg->npes-1; count ++)
183                 CmiSyncSend(cmsg->pelist[count], cmsg->size, 
184                             cmsg->getMessage());
185             if(cmsg->npes > 0)
186                 CmiSyncSendAndFree(cmsg->pelist[cmsg->npes-1], cmsg->size, 
187                                    cmsg->getMessage());
188         }
189         else
190             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
191                                cmsg->getMessage());
192         delete cmsg;
193     }
194     else {
195         if(cmsg->dest_proc >= 0) {
196             cmsg->pelist = &procMap[cmsg->dest_proc];
197             cmsg->npes = 1;
198         }
199         else if (cmsg->dest_proc == IS_BROADCAST){
200
201             if(bcast_pemap == NULL) {
202                 bcast_pemap = new int[npes];
203                 for(count = 0; count < npes; count ++) {
204                     bcast_pemap[count] = count;
205                 }
206             }
207
208             cmsg->pelist = bcast_pemap;
209             cmsg->npes = npes;
210         }
211         
212         msgQ.push(cmsg);
213     }
214 }
215
216 void RouterStrategy::doneInserting(){
217     
218     if(myPe < 0)
219         CmiAbort("insertMessage: mype < 0\n");
220
221     id.instanceID = getInstance();
222
223     //ComlibPrintf("Instance ID = %d\n", getInstance());
224     ComlibPrintf("%d: DoneInserting %d \n", CkMyPe(), msgQ.length());
225     
226     if(doneFlag == 0) {
227         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
228                      CkMyPe());
229         bufferedDoneInserting = 1;
230         return;
231     }
232     
233     if(routerID == USE_DIRECT) {
234         DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
235         memset((char *)m, 0, sizeof(DummyMsg)); 
236         m->id.instanceID = getInstance();
237         
238         Done(m);
239         return;
240     }
241
242     doneFlag = 0;
243     bufferedDoneInserting = 0;
244
245     id.refno ++;
246
247     if(msgQ.length() == 0) {
248         if(routerID == USE_DIRECT)
249             return;
250
251         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
252         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
253         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
254         
255         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
256                                                      myPe, 
257                                                      sizeof(DummyMsg));
258         cmsg->isDummy = 1;
259         cmsg->pelist = &myPe;
260         cmsg->npes = 1;
261         msgQ.push(cmsg);
262     }
263
264     /*  // OLDER version which called an extra virtual method for each
265         //message 
266       int numToDeposit = msgQ.length();
267       
268       while(!msgQ.isEmpty()) {
269       MessageHolder *cmsg = msgQ.deq();
270       char *msg = cmsg->getMessage();
271       
272       if(!cmsg->isDummy)  {            
273       //Assuming list of processors to multicast to is in the
274       //order of relative processors numbering and NOT absolute
275       //processor numbering
276       
277       if(cmsg->dest_proc == IS_BROADCAST) {  
278       router->EachToManyMulticast(id, cmsg->size, msg, cmsg->npes, 
279       cmsg->pelist, 
280       numToDeposit > 1);
281       }            
282       else {                                
283       
284       ComlibPrintf("%d: Insert Pers. Message to %d\n", CkMyPe(), procMap[cmsg->dest_proc]);
285       router->EachToManyMulticast(id, cmsg->size, msg, 1,
286       &procMap[cmsg->dest_proc],
287       numToDeposit > 1);
288       }            
289       }   
290       else
291       router->EachToManyMulticast(id, cmsg->size, msg, 1, &myPe, 
292       numToDeposit > 1);
293       
294       numToDeposit --;
295       delete cmsg;        
296       }
297     */
298     
299     router->EachToManyMulticastQ(id, msgQ);
300
301     while(!recvQ.isEmpty()) {
302         char *msg = recvQ.deq();
303         RecvManyMsg(msg);
304     }
305
306     while(!procQ.isEmpty()) {
307         char *msg = procQ.deq();
308         ProcManyMsg(msg);
309     }
310
311     while(!dummyQ.isEmpty() > 0) {
312         DummyMsg *m = dummyQ.deq();
313         router->DummyEP(m->id, m->magic);
314         CmiFree(m);
315     }
316 }
317
318 void RouterStrategy::Done(DummyMsg *m){
319
320     ComlibPrintf("%d: Finished iteration\n", CkMyPe());
321
322     if(doneHandle > 0) {
323         CmiSetHandler(m, doneHandle);
324         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
325     }
326     else
327         CmiFree(m);
328
329     doneFlag = 1;
330
331     if(bufferedDoneInserting)
332         doneInserting();
333 }
334
335
336 //Implement it later while implementing checkpointing of Comlib
337 void RouterStrategy::pup(PUP::er &p){}
338
339 PUPable_def(RouterStrategy);