fixing multicast
[charm.git] / src / conv-com / routerstrategy.C
1
2 #include "routerstrategy.h"
3
4 CkpvDeclare(int, RecvHandle);
5 CkpvDeclare(int, ProcHandle);
6 CkpvDeclare(int, DummyHandle);
7
8 //Handlers that call the entry funtions of routers 
9 //Refer to router.h for details on these entry functions
10
11 //Correspods to Router::ProcManyMsg
12 void procManyCombinedMsg(char *msg)
13 {
14     //comID id;
15     int instance_id;
16
17     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
18     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
19
20     //Comid specific
21     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
22            , sizeof(int));
23
24     Strategy *s = ConvComlibGetStrategy(instance_id);
25     ((RouterStrategy *)s)->ProcManyMsg(msg);
26 }
27
28 //Correspods to Router::DummyEP
29 void dummyEP(DummyMsg *m)
30 {
31     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
32     
33     ((RouterStrategy *)s)->DummyEP(m);
34 }
35
36 //Correspods to Router::RecvManyMsg
37 void recvManyCombinedMsg(char *msg)
38 {
39     //comID id;
40     int instance_id;
41     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
42     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
43     
44     //Comid specific
45     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
46            , sizeof(int));
47
48     Strategy *s = ConvComlibGetStrategy(instance_id);
49     ((RouterStrategy *)s)->RecvManyMsg(msg);
50 }
51
52
53 void doneHandler(DummyMsg *m){
54     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
55     
56     ((RouterStrategy *)s)->Done(m);
57 }
58
59 void RouterStrategy::setReverseMap(){
60     int pcount;
61     for(pcount = 0; pcount < CkNumPes(); pcount++)
62         procMap[pcount] = -1;
63
64     //All processors not in the domain will point to -1
65     for(pcount = 0; pcount < npes; pcount++) {
66         if (pelist[pcount] == CkMyPe())
67             myPe = pcount;
68
69         procMap[pelist[pcount]] = pcount;
70     }
71 }
72
73 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
74                                int *_pelist) 
75     : Strategy(){
76     
77     setType(CONVERSE_STRATEGY);
78
79     CkpvInitialize(int, RecvHandle);
80     CkpvInitialize(int, ProcHandle);
81     CkpvInitialize(int, DummyHandle);
82
83     id.instanceID = 0; //Set later in doneInserting
84     
85     id.isAllToAll = 0;
86     id.refno = 0;
87
88     CkpvAccess(RecvHandle) =
89         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
90     CkpvAccess(ProcHandle) =
91         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
92     CkpvAccess(DummyHandle) = 
93         CkRegisterHandler((CmiHandler)dummyEP);    
94
95     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
96
97     //Array strategy done handle
98     doneHandle = handle;
99
100     routerID = stratid;
101
102     npes = _npes;
103     pelist = new int[npes];
104     procMap = new int[CkNumPes()];
105     
106     memcpy(pelist, _pelist, sizeof(int) * npes);    
107     setReverseMap();
108
109     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
110            myPe, npes);
111
112     switch(stratid) {
113     case USE_TREE: 
114         router = new TreeRouter(npes, myPe);
115         break;
116         
117     case USE_MESH:
118         router = new GridRouter(npes, myPe);
119         break;
120         
121     case USE_HYPERCUBE:
122         router = new DimexRouter(npes, myPe);
123         break;
124         
125     case USE_GRID:
126         router = new D3GridRouter(npes, myPe);
127         break;
128
129     case USE_DIRECT: router = NULL;
130         break;
131         
132     default: CmiAbort("Unknown Strategy\n");
133         break;
134     }
135
136     if(router) {
137         router->SetMap(pelist);
138         router->setDoneHandle(myDoneHandle);
139         //router->SetID(id);
140     }
141
142     //Start with all iterations done
143     doneFlag = 1;
144
145     //No Buffered doneInserting at the begining
146     bufferedDoneInserting = 0;
147 }
148
149 void RouterStrategy::insertMessage(MessageHolder *cmsg){
150
151     if(routerID == USE_DIRECT) {
152         if(cmsg->dest_proc == IS_MULTICAST) {
153             for(int count = 0; count < cmsg->npes-1; count ++)
154                 CmiSyncSend(cmsg->pelist[count], cmsg->size, 
155                             cmsg->getMessage());
156             if(cmsg->npes > 0)
157                 CmiSyncSendAndFree(cmsg->pelist[cmsg->npes-1], cmsg->size, 
158                                    cmsg->getMessage());
159         }
160         else
161             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
162                                cmsg->getMessage());
163         delete cmsg;
164     }
165     else
166         msgQ.push(cmsg);
167 }
168
169 void RouterStrategy::doneInserting(){
170     
171     id.instanceID = getInstance();
172
173     ComlibPrintf("Instance ID = %d\n", getInstance());
174
175     if(routerID == USE_DIRECT)
176         return;
177
178     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
179     
180     if(doneFlag == 0) {
181         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
182                      CkMyPe());
183         bufferedDoneInserting = 1;
184         return;
185     }
186     
187     doneFlag = 0;
188     bufferedDoneInserting = 0;
189
190     id.refno ++;
191
192     if(msgQ.length() == 0) {
193         if(routerID == USE_DIRECT)
194             return;
195
196         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
197         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
198         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
199         
200         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
201                                                      myPe, 
202                                                      sizeof(DummyMsg));
203         cmsg->isDummy = 1;
204         msgQ.push(cmsg);
205     }
206
207     int numToDeposit = msgQ.length();
208     
209     while(!msgQ.isEmpty()) {
210         MessageHolder *cmsg = msgQ.deq();
211         char *msg = cmsg->getMessage();
212         
213         if(!cmsg->isDummy)  {            
214             //Assuming list of processors to multicast to is in the
215             //order of relative processors numbering and NOT absolute
216             //processor numbering
217
218             if(cmsg->dest_proc == IS_MULTICAST) {  
219                 router->EachToManyMulticast(id, cmsg->size, msg, cmsg->npes, 
220                                             cmsg->pelist, 
221                                             numToDeposit > 1);
222             }            
223             else {                                
224                 
225                 ComlibPrintf("%d: Insert Pers. Message to %d\n", CkMyPe(), procMap[cmsg->dest_proc]);
226                              router->EachToManyMulticast(id, cmsg->size, msg, 1,
227                                             &procMap[cmsg->dest_proc],
228                                             numToDeposit > 1);
229             }            
230         }   
231         else
232             router->EachToManyMulticast(id, cmsg->size, msg, 1, &myPe, 
233                                         numToDeposit > 1);
234         
235         numToDeposit --;
236         delete cmsg;        
237     }
238
239     while(!recvQ.isEmpty()) {
240         char *msg = recvQ.deq();
241         RecvManyMsg(msg);
242     }
243
244     while(!procQ.isEmpty()) {
245         char *msg = procQ.deq();
246         ProcManyMsg(msg);
247     }
248
249     while(!dummyQ.isEmpty() > 0) {
250         DummyMsg *m = dummyQ.deq();
251         router->DummyEP(m->id, m->magic);
252         CmiFree(m);
253     }
254 }
255
256 void RouterStrategy::Done(DummyMsg *m){
257
258     ComlibPrintf("%d: Finished iteration\n", CkMyPe());
259
260     if(doneHandle > 0) {
261         CmiSetHandler(m, doneHandle);
262         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
263     }
264     else
265         CmiFree(m);
266
267     doneFlag = 1;
268
269     if(bufferedDoneInserting)
270         doneInserting();
271 }
272
273
274 //Implement it later while implementing checkpointing of Comlib
275 void RouterStrategy::pup(PUP::er &p){}
276
277 //Call the router functions
278 void RouterStrategy::RecvManyMsg(char *msg) {
279
280     //comID new_id;
281     int new_refno =0;
282
283     //FOO BAR when structure of comid changes this will break !!!!!
284     ComlibPrintf("In RecvManyMsg at %d\n", CkMyPe());
285     //memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
286     //ComlibPrintf("REFNO = %d, %d\n", new_id.refno, id.refno);
287     
288     //First int in comid is refno
289     memcpy(&new_refno, (char*) msg + CmiReservedHeaderSize + sizeof(int), 
290            sizeof(int)); 
291
292     if(new_refno != id.refno)
293         recvQ.push(msg);
294     else
295         router->RecvManyMsg(id, msg);
296 }
297
298 void RouterStrategy::ProcManyMsg(char *msg) {    
299
300     //comID new_id;
301     int new_refno =0;
302     ComlibPrintf("In ProcManyMsg at %d\n", CkMyPe());
303     //memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
304     //First int in comid is refno
305     memcpy(&new_refno, (char*) msg + CmiReservedHeaderSize + sizeof(int), 
306            sizeof(int)); 
307     
308     if(new_refno != id.refno)
309         procQ.push(msg);
310     else
311         router->ProcManyMsg(id, msg);
312 }
313
314 void RouterStrategy::DummyEP(DummyMsg *m) {
315
316     if(id.refno != m->id.refno)
317         dummyQ.push(m);
318     else {
319         router->DummyEP(m->id, m->magic);
320         CmiFree(m);
321     }
322 }
323
324 PUPable_def(RouterStrategy);