added a variable higherLevel and a method deliverer to be used to uniform the
[charm.git] / src / conv-com / routerstrategy.C
1
2 #include "routerstrategy.h"
3
4 CkpvDeclare(int, RecvHandle);
5 CkpvDeclare(int, ProcHandle);
6 CkpvDeclare(int, DummyHandle);
7
8 //Handlers that call the entry funtions of routers 
9 //Refer to router.h for details on these entry functions
10
11 //Correspods to Router::ProcManyMsg
12 void procManyCombinedMsg(char *msg)
13 {
14     comID id;
15     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
16     memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
17     
18     Strategy *s = ConvComlibGetStrategy(id.instanceID);
19     ((RouterStrategy *)s)->ProcManyMsg(msg);
20 }
21
22 //Correspods to Router::DummyEP
23 void dummyEP(DummyMsg *m)
24 {
25     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
26     
27     ((RouterStrategy *)s)->DummyEP(m);
28 }
29
30 //Correspods to Router::RecvManyMsg
31 void recvManyCombinedMsg(char *msg)
32 {
33     comID id;
34     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
35     memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
36     
37     Strategy *s = ConvComlibGetStrategy(id.instanceID);
38     ((RouterStrategy *)s)->RecvManyMsg(msg);
39 }
40
41
42 void doneHandler(DummyMsg *m){
43     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
44     
45     ((RouterStrategy *)s)->Done(m);
46 }
47
48 void RouterStrategy::setReverseMap(){
49     int pcount;
50     for(pcount = 0; pcount < CkNumPes(); pcount++)
51         procMap[pcount] = -1;
52
53     //All processors not in the domain will point to -1
54     for(pcount = 0; pcount < npes; pcount++) {
55         if (pelist[pcount] == CmiMyPe())
56             myPe = pelist[pcount];
57
58         procMap[pelist[pcount]] = pcount;
59     }
60 }
61
62 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
63                                int *_pelist) 
64     : Strategy(){
65     
66     setType(CONVERSE_STRATEGY);
67
68     CkpvInitialize(int, RecvHandle);
69     CkpvInitialize(int, ProcHandle);
70     CkpvInitialize(int, DummyHandle);
71
72     id.instanceID = 0; //Set later in doneInserting
73     
74     id.isAllToAll = 0;
75     id.refno = 0;
76
77     CkpvAccess(RecvHandle) =
78         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
79     CkpvAccess(ProcHandle) =
80         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
81     CkpvAccess(DummyHandle) = 
82         CkRegisterHandler((CmiHandler)dummyEP);    
83
84     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
85
86     //Array strategy done handle
87     doneHandle = handle;
88
89     routerID = stratid;
90
91     npes = _npes;
92     pelist = new int[npes];
93     procMap = new int[CmiNumPes()];
94     
95     memcpy(pelist, _pelist, sizeof(int) * npes);    
96     setReverseMap();
97
98     //CkPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
99     //       myPe, npes);
100
101     switch(stratid) {
102     case USE_TREE: 
103         router = new TreeRouter(npes, myPe);
104         break;
105         
106     case USE_MESH:
107         router = new GridRouter(npes, myPe);
108         break;
109         
110     case USE_HYPERCUBE:
111         router = new DimexRouter(npes, myPe);
112         break;
113         
114     case USE_GRID:
115         router = new D3GridRouter(npes, myPe);
116         break;
117
118     case USE_DIRECT: router = NULL;
119         break;
120         
121     default: CmiAbort("Unknown Strategy\n");
122         break;
123     }
124
125     if(router) {
126         router->SetMap(pelist);
127         router->setDoneHandle(myDoneHandle);
128         //router->SetID(id);
129     }
130
131     //Start with all iterations done
132     doneFlag = 1;
133
134     //No Buffered doneInserting at the begining
135     bufferedDoneInserting = 0;
136 }
137
138 void RouterStrategy::insertMessage(MessageHolder *cmsg){
139
140     if(routerID == USE_DIRECT) {
141         CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, cmsg->getMessage());
142         delete cmsg;
143     }
144     else
145         msgQ.enq(cmsg);
146 }
147
148 void RouterStrategy::doneInserting(){
149     
150     id.instanceID = getInstance();
151
152     ComlibPrintf("Instance ID = %d\n", getInstance());
153
154     if(routerID == USE_DIRECT)
155         return;
156
157     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
158     
159     if(doneFlag == 0) {
160         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
161                      CmiMyPe());
162         bufferedDoneInserting = 1;
163         return;
164     }
165     
166     doneFlag = 0;
167     bufferedDoneInserting = 0;
168
169     id.refno ++;
170
171     if(msgQ.length() == 0) {
172         if(routerID == USE_DIRECT)
173             return;
174
175         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
176         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
177         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
178         
179         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
180                                                      CkMyPe(), 
181                                                      sizeof(DummyMsg));
182         cmsg->isDummy = 1;
183         msgQ.enq(cmsg);
184     }
185
186     int numToDeposit = msgQ.length();
187     
188     while(!msgQ.isEmpty()) {
189         MessageHolder *cmsg = msgQ.deq();
190         char *msg = cmsg->getMessage();
191         
192         if(!cmsg->isDummy)  {            
193             //Assuming list of processors to multicast to is in the
194             //order of relative processors numbering and NOT absolute
195             //processor numbering
196
197             if(cmsg->dest_proc == IS_MULTICAST) {  
198                 router->EachToManyMulticast(id, cmsg->size, msg, cmsg->npes, 
199                                             cmsg->pelist, 
200                                             numToDeposit > 1);
201             }            
202             else {                                
203                 router->EachToManyMulticast(id, cmsg->size, msg, 1,
204                                             &procMap[cmsg->dest_proc],
205                                             numToDeposit > 1);
206             }            
207         }   
208         else
209             router->EachToManyMulticast(id, cmsg->size, msg, 1, &myPe, 
210                                         numToDeposit > 1);
211         
212         numToDeposit --;        
213     }
214
215     while(!recvQ.isEmpty()) {
216         char *msg = recvQ.deq();
217         RecvManyMsg(msg);
218     }
219
220     while(!procQ.isEmpty()) {
221         char *msg = procQ.deq();
222         ProcManyMsg(msg);
223     }
224
225     while(!dummyQ.isEmpty() > 0) {
226         DummyMsg *m = dummyQ.deq();
227         router->DummyEP(m->id, m->magic);
228         CmiFree(m);
229     }
230 }
231
232 void RouterStrategy::Done(DummyMsg *m){
233
234     ComlibPrintf("%d: Finished iteration\n", CmiMyPe());
235
236     if(doneHandle > 0) {
237         CmiSetHandler(m, doneHandle);
238         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
239     }
240     else
241         CmiFree(m);
242
243     doneFlag = 1;
244
245     if(bufferedDoneInserting)
246         doneInserting();
247 }
248
249
250 //Implement it later while implementing checkpointing of Comlib
251 void RouterStrategy::pup(PUP::er &p){}
252
253 //Call the router functions
254 void RouterStrategy::RecvManyMsg(char *msg) {
255
256     comID new_id;
257     ComlibPrintf("In RecvManyMsg at %d\n", CkMyPe());
258     memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
259
260     ComlibPrintf("REFNO = %d, %d\n", new_id.refno, id.refno);
261
262     if(new_id.refno != id.refno)
263         recvQ.enq(msg);
264     else
265         router->RecvManyMsg(id, msg);
266 }
267
268 void RouterStrategy::ProcManyMsg(char *msg) {    
269
270     comID new_id;
271     ComlibPrintf("In ProcManyMsg at %d\n", CkMyPe());
272     memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
273
274     if(new_id.refno != id.refno)
275         procQ.enq(msg);
276     else
277         router->ProcManyMsg(id, msg);
278 }
279
280 void RouterStrategy::DummyEP(DummyMsg *m) {
281
282     if(id.refno != m->id.refno)
283         dummyQ.enq(m);
284     else {
285         router->DummyEP(m->id, m->magic);
286         CmiFree(m);
287     }
288 }
289
290 PUPable_def(RouterStrategy);