Fixing some bugs in the new node aware multicast strategy.
[charm.git] / src / ck-com / OneTimeMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "OneTimeMulticastStrategy.h"
10 #include <string>
11 #include <set>
12 #include <vector>
13
14 #define DEBUG 1
15
16 CkpvExtern(CkGroupID, cmgrID);
17
18 OneTimeMulticastStrategy::OneTimeMulticastStrategy()
19   : Strategy(), CharmStrategy() {
20   //  ComlibPrintf("OneTimeMulticastStrategy constructor\n");
21   setType(ARRAY_STRATEGY);
22 }
23
24 OneTimeMulticastStrategy::~OneTimeMulticastStrategy() {
25 }
26
27 void OneTimeMulticastStrategy::pup(PUP::er &p){
28   Strategy::pup(p);
29   CharmStrategy::pup(p);
30 }
31
32
33 /** Called when the user invokes the entry method on the delegated proxy. */
34 void OneTimeMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
35   CkPrintf("[%d] OneTimeMulticastStrategy::insertMessage\n", CkMyPe());
36   fflush(stdout);
37   
38   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
39     CkAbort("OneTimeMulticastStrategy can only be used with an array section proxy");
40   }
41     
42   // Create a multicast message containing all information about remote destination objects 
43   int needSort = 0;
44   ComlibMulticastMsg * multMsg = sinfo.getNewMulticastMessage(cmsg, needSort, getInstance());
45   
46   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
47   localMulticast(cmsg);
48   
49   // The remote multicast method will send the message to the remote PEs, as specified in multMsg
50   remoteMulticast(multMsg, true);
51    
52   delete cmsg;    
53 }
54
55
56
57 /** Deliver the message to the local elements. */
58 void OneTimeMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
59   double start = CmiWallTimer();
60   CkSectionID *sec_id = cmsg->sec_id;
61   CkVec< CkArrayIndexMax > localIndices;
62   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
63   deliverToIndices(cmsg->getCharmMessage(), localIndices );
64   traceUserBracketEvent(10000, start, CmiWallTimer());
65 }
66
67
68
69
70
71 /** 
72     Forward multicast message to our successor processors in the spanning tree. 
73     Uses CmiSyncListSendAndFree for delivery to this strategy's OneTimeMulticastStrategy::handleMessage method.
74 */
75 void OneTimeMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE) {
76   double start = CmiWallTimer();
77
78   envelope *env = UsrToEnv(multMsg);
79     
80   
81   /// The index into the PE list in the message
82   int myIndex = -10000; 
83   const int totalDestPEs = multMsg->nPes;
84   const int myPe = CkMyPe();
85   
86   // Find my index in the list of all destination PEs
87   if(rootPE){
88     myIndex = -1;
89   } else {
90     for (int i=0; i<totalDestPEs; ++i) {
91       if(multMsg->indicesCount[i].pe == myPe){
92         myIndex = i;
93         break;
94       }
95     }
96   }
97   
98   if(myIndex == -10000)
99     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
100   
101   int npes;
102   int *pelist = NULL;
103
104   if(totalDestPEs > 0)
105     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes );
106   else
107     npes = 0;
108     
109
110   if(npes == 0) {
111 #if DEBUG
112     CkPrintf("[%d] OneTimeMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
113 #endif
114     traceUserBracketEvent(10001, start, CmiWallTimer());
115     CmiFree(env);
116     return;
117   }
118   
119   CmiSetHandler(env, CkpvAccess(comlib_handler));
120   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
121   CkPackMessage(&env);
122
123   //Collect Multicast Statistics
124   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
125   
126   CkAssert(npes > 0);
127   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
128   
129   delete[] pelist;
130   traceUserBracketEvent(10001, start, CmiWallTimer());
131
132 }
133
134
135
136 /** 
137     Receive an incoming multicast message(sent from OneTimeMulticastStrategy::remoteMulticast).
138     Deliver the message to all local elements 
139 */
140 void OneTimeMulticastStrategy::handleMessage(void *msg){
141 #if DEBUG
142   //  CkPrintf("[%d] OneTimeMulticastStrategy::handleMessage\n", CkMyPe());
143 #endif
144   envelope *env = (envelope *)msg;
145   CkUnpackMessage(&env);
146   
147   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
148   
149   // Don't use msg after this point. Instead use the unpacked env
150   
151   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
152   
153   // Deliver to objects marked as local in the message
154   int localElems;
155   envelope *newenv;
156   CkArrayIndexMax *local_idx_list;  
157   sinfo.unpack(env, localElems, local_idx_list, newenv);
158   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
159   deliverToIndices(newmsg, localElems, local_idx_list );
160   
161   // Forward on to other processors if necessary
162   remoteMulticast(multMsg, false);
163
164 }
165
166
167
168
169 void OneTimeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
170   if(myIndex==-1){
171     // We are at a root node of the spanning tree. 
172     // We will forward the message to all other PEs in the destination list.
173     npes = totalDestPEs;
174     
175     pelist = new int[npes];
176     for (int i=0; i<npes; ++i) {
177       pelist[i] = destPEs[i].pe;
178     }
179   } else {
180     // We are at a leaf node of the spanning tree. 
181     npes = 0;
182   }
183   
184 }
185
186
187 void OneTimeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
188   const int myPe = CkMyPe();
189
190   if(myIndex == totalDestPEs-1){
191     // Final PE won't send to anyone
192     npes = 0;
193     return;
194   } else {
195     // All non-final PEs will send to next PE in list
196     npes = 1;
197     pelist = new int[1];
198     pelist[0] = destPEs[myIndex+1].pe;
199   }
200
201 }
202
203
204 void OneTimeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
205   const int myPe = CkMyPe();
206   
207   // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
208   
209   int sendLogicalIndexStart = degree*(myIndex+1) + 1;       // inclusive
210   int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
211   
212   if(sendLogicalIndexEnd-1 >= totalDestPEs){
213     sendLogicalIndexEnd = totalDestPEs;
214   }
215
216   int numSend = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
217   if(numSend <= 0){
218     npes = 0;
219     return;
220   }
221  
222 #if DEBUG
223   if(numSend > 0)
224     CkPrintf("Tree logical index %d sending to logical %d to %d (totalDestPEs excluding root=%d)  numSend=%d\n",
225              myIndex+1, sendLogicalIndexStart, sendLogicalIndexEnd, totalDestPEs, numSend);
226 #endif
227
228   npes = numSend;
229   pelist = new int[npes];
230   
231   for(int i=0;i<numSend;i++){
232     CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
233     pelist[i] = destPEs[sendLogicalIndexStart-1+i].pe;
234 #if DEBUG
235     CkPrintf("Tree logical index %d sending to PE %d\n", myIndex+1, pelist[i]);
236 #endif
237     CkAssert(pelist[i] < CkNumPes());
238   }
239   
240 }
241
242
243 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
244 int getFirstPeOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
245   int num;
246   int *nodePeList;
247   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
248   
249   for(int i=0;i<num;i++){
250     // Scan destPEs for the pe
251     int p = nodePeList[i];
252     
253     for(int j=0;j<totalDestPEs;j++){
254       if(p == destPEs[j].pe){
255         // found the representative PE for the node that is in the destPEs list
256         return p;
257       }
258     }
259   }
260   
261   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
262   return -1;
263 }
264
265
266
267 /** List all the other PEs from the list that share the physical node */
268 std::vector<int> getOtherPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
269   
270   std::vector<int> result;
271
272   int num;
273   int *nodePeList;
274   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
275   
276   for(int i=0;i<num;i++){
277     // Scan destPEs for the pe
278     int p = nodePeList[i];
279     if(p != pe){
280       for(int j=0;j<totalDestPEs;j++){
281         if(p == destPEs[j].pe){
282           // found the representative PE for the node that is in the destPEs list
283           result.push_back(p);
284           break;
285         }
286       }
287     }
288   }
289   
290   return result;
291 }
292
293
294 void OneTimeNodeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
295   const int myPe = CkMyPe();
296
297   std::set<int> nodePERepresentatives;
298   
299   // create a list of PEs, with one for each node to which the message must be sent
300   for(int i=0; i<totalDestPEs; i++){
301     int pe = destPEs[i].pe;
302     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
303     nodePERepresentatives.insert(representative);    
304   }
305   
306   int numRepresentativePEs = nodePERepresentatives.size();
307   
308   int repForMyPe=-1;
309   if(myIndex != -1)
310     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
311   
312 #if DEBUG
313   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
314   fflush(stdout);
315 #endif
316   
317   // If this PE is part of the multicast tree, then it should forward the message along
318   if(CkMyPe() == repForMyPe || myIndex == -1){
319     // I am an internal node in the multicast tree
320     
321     // flatten the data structure for nodePERepresentatives
322     int *repPeList = new int[numRepresentativePEs];
323     int myRepIndex = -1;
324     std::set<int>::iterator iter;
325     int p=0;
326     for(iter=nodePERepresentatives.begin(); iter != nodePERepresentatives.end(); iter++){
327       repPeList[p] = *iter;
328       if(*iter == repForMyPe)
329         myRepIndex = p;
330       p++;
331     }
332     CkAssert(myRepIndex >=0 || myIndex==-1);
333       
334     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
335     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
336     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
337     
338     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
339       sendLogicalIndexEnd = numRepresentativePEs;
340     }
341     
342     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
343     if(numSendTree < 0)
344       numSendTree = 0;
345     
346     std::vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
347     int numSendLocal;
348     if(myIndex == -1)
349       numSendLocal = 0;
350     else 
351       numSendLocal = otherLocalPes.size();
352     
353     
354
355 #if DEBUG
356     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
357     fflush(stdout);
358 #endif
359
360     int numSend = numSendTree + numSendLocal;
361     if(numSend <= 0){
362       npes = 0;
363       return;
364     }
365     
366     npes = numSend;
367     pelist = new int[npes];
368   
369     for(int i=0;i<numSendTree;i++){
370       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
371       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
372       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
373     }
374     
375     delete[] repPeList;
376     repPeList = NULL;
377
378     for(int i=0;i<numSendLocal;i++){
379       pelist[i+numSendTree] = otherLocalPes[i];
380       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
381     }
382     
383     
384 #if 1
385     char buf[1024];
386     sprintf(buf, "PE %d is sending to PEs: ", CkMyPe() );
387     for(int i=0;i<numSend;i++){
388       sprintf(buf+strlen(buf), "%d ", pelist[i]);
389     }    
390     CkPrintf("%s\n", buf);
391     fflush(stdout);
392 #endif
393         
394   } else {
395     // We are a leaf PE
396     npes = 0;
397     return;
398   }
399
400   
401   
402 }
403
404
405
406 /*@}*/