Adding a new handler which makes it easy for strategies to
[charm.git] / src / conv-com / routerstrategy.C
1
2 #include "routerstrategy.h"
3
4 CkpvDeclare(int, RecvHandle);
5 CkpvDeclare(int, ProcHandle);
6 CkpvDeclare(int, DummyHandle);
7
8 //Handlers that call the entry funtions of routers 
9 //Refer to router.h for details on these entry functions
10
11 //Correspods to Router::ProcManyMsg
12 void procManyCombinedMsg(char *msg)
13 {
14     //comID id;
15     int instance_id;
16
17     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
18     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
19
20     //Comid specific
21     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
22            , sizeof(int));
23
24     Strategy *s = ConvComlibGetStrategy(instance_id);
25     ((RouterStrategy *)s)->ProcManyMsg(msg);
26 }
27
28 //Correspods to Router::DummyEP
29 void dummyEP(DummyMsg *m)
30 {
31     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
32     
33     ((RouterStrategy *)s)->DummyEP(m);
34 }
35
36 //Correspods to Router::RecvManyMsg
37 void recvManyCombinedMsg(char *msg)
38 {
39     //comID id;
40     int instance_id;
41     ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
42     //memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
43     
44     //Comid specific
45     memcpy(&instance_id, (char*) msg + CmiReservedHeaderSize + 2*sizeof(int)
46            , sizeof(int));
47
48     Strategy *s = ConvComlibGetStrategy(instance_id);
49     ((RouterStrategy *)s)->RecvManyMsg(msg);
50 }
51
52
53 void doneHandler(DummyMsg *m){
54     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
55     
56     ((RouterStrategy *)s)->Done(m);
57 }
58
59 void RouterStrategy::setReverseMap(){
60     int pcount;
61     for(pcount = 0; pcount < CkNumPes(); pcount++)
62         procMap[pcount] = -1;
63
64     //All processors not in the domain will point to -1
65     for(pcount = 0; pcount < npes; pcount++) {
66         if (pelist[pcount] == CkMyPe())
67             myPe = pcount;
68
69         procMap[pelist[pcount]] = pcount;
70     }
71 }
72
73 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
74                                int *_pelist) 
75     : Strategy(){
76     
77     setType(CONVERSE_STRATEGY);
78
79     CkpvInitialize(int, RecvHandle);
80     CkpvInitialize(int, ProcHandle);
81     CkpvInitialize(int, DummyHandle);
82
83     id.instanceID = 0; //Set later in doneInserting
84     
85     id.isAllToAll = 0;
86     id.refno = 0;
87
88     CkpvAccess(RecvHandle) =
89         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
90     CkpvAccess(ProcHandle) =
91         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
92     CkpvAccess(DummyHandle) = 
93         CkRegisterHandler((CmiHandler)dummyEP);    
94
95     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
96
97     //Array strategy done handle
98     doneHandle = handle;
99
100     routerID = stratid;
101
102     npes = _npes;
103     //pelist = new int[npes];
104     pelist = _pelist;
105     //memcpy(pelist, _pelist, sizeof(int) * npes);    
106
107     if(npes <= 1)
108         routerID = USE_DIRECT;
109
110     myPe = -1;
111     procMap = new int[CkNumPes()];    
112     setReverseMap();
113
114     bcast_pemap = NULL;
115
116     ComlibPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
117                  myPe, npes);
118
119     if(myPe < 0) {
120         //I am not part of this strategy
121         
122         doneFlag = 0;
123         router = NULL;
124         bufferedDoneInserting = 0;
125         return;        
126     }
127
128     //Start with all iterations done
129     doneFlag = 1;
130     
131     //No Buffered doneInserting at the begining
132     bufferedDoneInserting = 0;
133
134     switch(stratid) {
135     case USE_TREE: 
136         router = new TreeRouter(npes, myPe);
137         break;
138         
139     case USE_MESH:
140         router = new GridRouter(npes, myPe);
141         break;
142         
143     case USE_HYPERCUBE:
144         router = new DimexRouter(npes, myPe);
145         break;
146         
147     case USE_GRID:
148         router = new D3GridRouter(npes, myPe);
149         break;
150
151     case USE_DIRECT: router = NULL;
152         break;
153         
154     default: CmiAbort("Unknown Strategy\n");
155         break;
156     }
157
158     if(router) {
159         router->SetMap(pelist);
160         router->setDoneHandle(myDoneHandle);
161         //router->SetID(id);
162     }
163 }
164
165
166 RouterStrategy::~RouterStrategy() {
167     //delete [] pelist;
168
169     if(bcast_pemap)
170         delete [] bcast_pemap;
171     
172     delete [] procMap;
173     if(router)
174         delete router;
175 }
176
177
178 void RouterStrategy::insertMessage(MessageHolder *cmsg){
179
180     if(myPe < 0)
181         CmiAbort("insertMessage: mype < 0\n");
182
183     int count = 0;
184     if(routerID == USE_DIRECT) {
185         if(cmsg->dest_proc == IS_BROADCAST) {
186             for(count = 0; count < cmsg->npes-1; count ++)
187                 CmiSyncSend(cmsg->pelist[count], cmsg->size, 
188                             cmsg->getMessage());
189             if(cmsg->npes > 0)
190                 CmiSyncSendAndFree(cmsg->pelist[cmsg->npes-1], cmsg->size, 
191                                    cmsg->getMessage());
192         }
193         else
194             CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, 
195                                cmsg->getMessage());
196         delete cmsg;
197     }
198     else {
199         if(cmsg->dest_proc >= 0) {
200             cmsg->pelist = &procMap[cmsg->dest_proc];
201             cmsg->npes = 1;
202         }
203         else if (cmsg->dest_proc == IS_BROADCAST){
204
205             if(bcast_pemap == NULL) {
206                 bcast_pemap = new int[npes];
207                 for(count = 0; count < npes; count ++) {
208                     bcast_pemap[count] = count;
209                 }
210             }
211
212             cmsg->pelist = bcast_pemap;
213             cmsg->npes = npes;
214         }
215         
216         msgQ.push(cmsg);
217     }
218 }
219
220 void RouterStrategy::doneInserting(){
221     
222     if(myPe < 0)
223         CmiAbort("insertMessage: mype < 0\n");
224
225     id.instanceID = getInstance();
226
227     //ComlibPrintf("Instance ID = %d\n", getInstance());
228     ComlibPrintf("%d: DoneInserting %d \n", CkMyPe(), msgQ.length());
229     
230     if(doneFlag == 0) {
231         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
232                      CkMyPe());
233         bufferedDoneInserting = 1;
234         return;
235     }
236     
237     if(routerID == USE_DIRECT) {
238         DummyMsg *m = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
239         memset((char *)m, 0, sizeof(DummyMsg)); 
240         m->id.instanceID = getInstance();
241         
242         Done(m);
243         return;
244     }
245
246     doneFlag = 0;
247     bufferedDoneInserting = 0;
248
249     id.refno ++;
250
251     if(msgQ.length() == 0) {
252         if(routerID == USE_DIRECT)
253             return;
254
255         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
256         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
257         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
258         
259         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
260                                                      myPe, 
261                                                      sizeof(DummyMsg));
262         cmsg->isDummy = 1;
263         cmsg->pelist = &myPe;
264         cmsg->npes = 1;
265         msgQ.push(cmsg);
266     }
267
268     router->EachToManyMulticastQ(id, msgQ);
269
270     while(!recvQ.isEmpty()) {
271         char *msg = recvQ.deq();
272         RecvManyMsg(msg);
273     }
274
275     while(!procQ.isEmpty()) {
276         char *msg = procQ.deq();
277         ProcManyMsg(msg);
278     }
279
280     while(!dummyQ.isEmpty() > 0) {
281         DummyMsg *m = dummyQ.deq();
282         router->DummyEP(m->id, m->magic);
283         CmiFree(m);
284     }
285 }
286
287 void RouterStrategy::Done(DummyMsg *m){
288
289     ComlibPrintf("%d: Finished iteration\n", CkMyPe());
290
291     if(doneHandle > 0) {
292         CmiSetHandler(m, doneHandle);
293         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
294     }
295     else
296         CmiFree(m);
297
298     doneFlag = 1;
299
300     if(bufferedDoneInserting)
301         doneInserting();
302 }
303
304
305 //Implement it later while implementing checkpointing of Comlib
306 void RouterStrategy::pup(PUP::er &p){}
307
308 PUPable_def(RouterStrategy);