Adding communication library in src/ck-com and src/conv-com
[charm.git] / src / conv-com / routerstrategy.C
1
2 #include "routerstrategy.h"
3
4 CkpvDeclare(int, RecvHandle);
5 CkpvDeclare(int, ProcHandle);
6 CkpvDeclare(int, DummyHandle);
7
8 //Handlers that call the entry funtions of routers 
9 //Refer to router.h for details on these entry functions
10
11 //Correspods to Router::ProcManyMsg
12 void procManyCombinedMsg(char *msg)
13 {
14     comID id;
15     //  ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
16     memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
17     
18     Strategy *s = ConvComlibGetStrategy(id.instanceID);
19     ((RouterStrategy *)s)->ProcManyMsg(msg);
20 }
21
22 //Correspods to Router::DummyEP
23 void dummyEP(DummyMsg *m)
24 {
25     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
26     
27     ((RouterStrategy *)s)->DummyEP(m);
28 }
29
30 //Correspods to Router::RecvManyMsg
31 void recvManyCombinedMsg(char *msg)
32 {
33     comID id;
34     //  ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
35     memcpy(&id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
36     
37     Strategy *s = ConvComlibGetStrategy(id.instanceID);
38     ((RouterStrategy *)s)->RecvManyMsg(msg);
39 }
40
41
42 void doneHandler(DummyMsg *m){
43     Strategy *s = ConvComlibGetStrategy(m->id.instanceID);
44     
45     ((RouterStrategy *)s)->Done(m);
46 }
47
48 void RouterStrategy::setReverseMap(){
49     int pcount;
50     for(pcount = 0; pcount < CkNumPes(); pcount++)
51         procMap[pcount] = -1;
52
53     //All processors not in the domain will point to -1
54     for(pcount = 0; pcount < npes; pcount++) {
55         if (pelist[pcount] == CmiMyPe())
56             myPe = pelist[pcount];
57
58         procMap[pelist[pcount]] = pcount;
59     }
60 }
61
62 RouterStrategy::RouterStrategy(int stratid, int handle, int _npes, 
63                                int *_pelist) 
64     : Strategy(){
65     
66     setType(CONVERSE_STRATEGY);
67
68     CkpvInitialize(int, RecvHandle);
69     CkpvInitialize(int, ProcHandle);
70     CkpvInitialize(int, DummyHandle);
71
72     id.instanceID = 0; //getInstance();
73     id.isAllToAll = 0;
74     id.refno = 0;
75
76     CkpvAccess(RecvHandle) =
77         CkRegisterHandler((CmiHandler)recvManyCombinedMsg);
78     CkpvAccess(ProcHandle) =
79         CkRegisterHandler((CmiHandler)procManyCombinedMsg);
80     CkpvAccess(DummyHandle) = 
81         CkRegisterHandler((CmiHandler)dummyEP);    
82
83     myDoneHandle = CkRegisterHandler((CmiHandler)doneHandler);    
84
85     //Array strategy done handle
86     doneHandle = handle;
87
88     routerID = stratid;
89
90     npes = _npes;
91     pelist = new int[npes];
92     procMap = new int[CmiNumPes()];
93     
94     memcpy(pelist, _pelist, sizeof(int) * npes);    
95     setReverseMap();
96
97     //CkPrintf("Router Strategy : %d, MYPE = %d, NUMPES = %d \n", stratid, 
98     //       myPe, npes);
99
100     switch(stratid) {
101     case USE_TREE: 
102         router = new TreeRouter(npes, myPe);
103         break;
104         
105     case USE_MESH:
106         router = new GridRouter(npes, myPe);
107         break;
108         
109     case USE_HYPERCUBE:
110         router = new DimexRouter(npes, myPe);
111         break;
112         
113     case USE_GRID:
114         router = new D3GridRouter(npes, myPe);
115         break;
116
117     case USE_DIRECT:
118         break;
119         
120     default: CmiAbort("Unknown Strategy\n");
121         break;
122     }
123
124     if(router) {
125         router->SetMap(pelist);
126         router->setDoneHandle(myDoneHandle);
127         //router->SetID(id);
128     }
129
130     //Start with all iterations done
131     doneFlag = 1;
132
133     //No Buffered doneInserting at the begining
134     bufferedDoneInserting = 0;
135 }
136
137 void RouterStrategy::insertMessage(MessageHolder *cmsg){
138
139     if(routerID == USE_DIRECT) {
140         CmiSyncSendAndFree(cmsg->dest_proc, cmsg->size, cmsg->getMessage());
141         delete cmsg;
142     }
143     else
144         msgQ.enq(cmsg);
145 }
146
147 void RouterStrategy::doneInserting(){
148     
149     if(routerID == USE_DIRECT)
150         return;
151
152     ComlibPrintf("%d: DoneInserting \n", CkMyPe());
153     
154     if(doneFlag == 0) {
155         ComlibPrintf("%d:Waiting for previous iteration to Finish\n", 
156                      CmiMyPe());
157         bufferedDoneInserting = 1;
158         return;
159     }
160     
161     doneFlag = 0;
162     bufferedDoneInserting = 0;
163
164     id.refno ++;
165
166     if(msgQ.length() == 0) {
167         if(routerID == USE_DIRECT)
168             return;
169
170         DummyMsg * dummymsg = (DummyMsg *)CmiAlloc(sizeof(DummyMsg));
171         ComlibPrintf("[%d] Creating a dummy message\n", CkMyPe());
172         CmiSetHandler(dummymsg, CkpvAccess(RecvdummyHandle));
173         
174         MessageHolder *cmsg = new MessageHolder((char *)dummymsg, 
175                                                      CkMyPe(), 
176                                                      sizeof(DummyMsg));
177         cmsg->isDummy = 1;
178         msgQ.enq(cmsg);
179     }
180
181     int numToDeposit = msgQ.length();
182     
183     while(!msgQ.isEmpty()) {
184         MessageHolder *cmsg = msgQ.deq();
185         char *msg = cmsg->getMessage();
186         
187         if(!cmsg->isDummy)  {            
188             //Assuming list of processors to multicast to is in the
189             //order of relative processors numbering and NOT absolute
190             //processor numbering
191
192             if(cmsg->dest_proc == IS_MULTICAST) {  
193                 router->EachToManyMulticast(id, cmsg->size, msg, cmsg->npes, 
194                                             cmsg->pelist, 
195                                             numToDeposit > 1);
196             }            
197             else {                                
198                 router->EachToManyMulticast(id, cmsg->size, msg, 1,
199                                             &procMap[cmsg->dest_proc],
200                                             numToDeposit > 1);
201             }            
202         }   
203         else
204             router->EachToManyMulticast(id, cmsg->size, msg, 1, &myPe, 
205                                         numToDeposit > 1);
206         
207         numToDeposit --;        
208     }
209
210     while(!recvQ.isEmpty()) {
211         char *msg = recvQ.deq();
212         RecvManyMsg(msg);
213     }
214
215     while(!procQ.isEmpty()) {
216         char *msg = procQ.deq();
217         ProcManyMsg(msg);
218     }
219
220     while(!dummyQ.isEmpty() > 0) {
221         DummyMsg *m = dummyQ.deq();
222         router->DummyEP(m->id, m->magic);
223         CmiFree(m);
224     }
225 }
226
227 void RouterStrategy::Done(DummyMsg *m){
228
229     ComlibPrintf("%d: Finished iteration\n", CmiMyPe());
230
231     if(doneHandle > 0) {
232         CmiSetHandler(m, doneHandle);
233         CmiSyncSendAndFree(CkMyPe(), sizeof(DummyMsg), (char*)m);
234     }
235     else
236         CmiFree(m);
237
238     doneFlag = 1;
239
240     if(bufferedDoneInserting)
241         doneInserting();
242 }
243
244
245 //Implement it later while implementing checkpointing of Comlib
246 void RouterStrategy::pup(PUP::er &p){}
247
248 //Call the router functions
249 void RouterStrategy::RecvManyMsg(char *msg) {
250
251     comID new_id;
252     //  ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
253     memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
254
255     ComlibPrintf("REFNO = %d, %d\n", new_id.refno, id.refno);
256
257     if(new_id.refno != id.refno)
258         recvQ.enq(msg);
259     else
260         router->RecvManyMsg(id, msg);
261 }
262
263 void RouterStrategy::ProcManyMsg(char *msg) {    
264
265     comID new_id;
266     //  ComlibPrintf("In Recv combined message at %d\n", CkMyPe());
267     memcpy(&new_id,(msg+CmiReservedHeaderSize+sizeof(int)), sizeof(comID));
268
269     if(new_id.refno != id.refno)
270         procQ.enq(msg);
271     else
272         router->ProcManyMsg(id, msg);
273 }
274
275 void RouterStrategy::DummyEP(DummyMsg *m) {
276
277     if(id.refno != m->id.refno)
278         dummyQ.enq(m);
279     else {
280         router->DummyEP(m->id, m->magic);
281         CmiFree(m);
282     }
283 }
284
285 PUPable_def(RouterStrategy);