024ea7e9a8d5e76df44a093da36e2ee329399b0c
[charm.git] / src / ck-com / OneTimeMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "OneTimeMulticastStrategy.h"
10 #include <string>
11 #include <set>
12 #include <vector>
13
14 //#define DEBUG 1
15
16 CkpvExtern(CkGroupID, cmgrID);
17
18 OneTimeMulticastStrategy::OneTimeMulticastStrategy()
19   : Strategy(), CharmStrategy() {
20   //  ComlibPrintf("OneTimeMulticastStrategy constructor\n");
21   setType(ARRAY_STRATEGY);
22 }
23
24 OneTimeMulticastStrategy::~OneTimeMulticastStrategy() {
25 }
26
27 void OneTimeMulticastStrategy::pup(PUP::er &p){
28   Strategy::pup(p);
29   CharmStrategy::pup(p);
30 }
31
32
33 /** Called when the user invokes the entry method on the delegated proxy. */
34 void OneTimeMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
35 #if DEBUG
36   CkPrintf("[%d] OneTimeMulticastStrategy::insertMessage\n", CkMyPe());
37   fflush(stdout);
38 #endif 
39
40   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
41     CkAbort("OneTimeMulticastStrategy can only be used with an array section proxy");
42   }
43     
44   // Create a multicast message containing all information about remote destination objects 
45   int needSort = 0;
46   ComlibMulticastMsg * multMsg = sinfo.getNewMulticastMessage(cmsg, needSort, getInstance());
47   
48   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
49   localMulticast(cmsg);
50   
51   // The remote multicast method will send the message to the remote PEs, as specified in multMsg
52   remoteMulticast(multMsg, true);
53    
54   delete cmsg;    
55 }
56
57
58
59 /** Deliver the message to the local elements. */
60 void OneTimeMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
61   double start = CmiWallTimer();
62   CkSectionID *sec_id = cmsg->sec_id;
63   CkVec< CkArrayIndexMax > localIndices;
64   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
65   deliverToIndices(cmsg->getCharmMessage(), localIndices );
66   traceUserBracketEvent(10000, start, CmiWallTimer());
67 }
68
69
70
71
72
73 /** 
74     Forward multicast message to our successor processors in the spanning tree. 
75     Uses CmiSyncListSendAndFree for delivery to this strategy's OneTimeMulticastStrategy::handleMessage method.
76 */
77 void OneTimeMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE) {
78   double start = CmiWallTimer();
79
80   envelope *env = UsrToEnv(multMsg);
81     
82   
83   /// The index into the PE list in the message
84   int myIndex = -10000; 
85   const int totalDestPEs = multMsg->nPes;
86   const int myPe = CkMyPe();
87   
88   // Find my index in the list of all destination PEs
89   if(rootPE){
90     myIndex = -1;
91   } else {
92     for (int i=0; i<totalDestPEs; ++i) {
93       if(multMsg->indicesCount[i].pe == myPe){
94         myIndex = i;
95         break;
96       }
97     }
98   }
99   
100   if(myIndex == -10000)
101     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
102   
103   int npes;
104   int *pelist = NULL;
105
106   if(totalDestPEs > 0)
107     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes );
108   else {
109     npes = 0;
110   }
111
112   if(npes == 0) {
113 #if DEBUG
114     CkPrintf("[%d] OneTimeMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
115 #endif
116     traceUserBracketEvent(10001, start, CmiWallTimer());
117     CmiFree(env);
118     return;
119   }
120   
121   CmiSetHandler(env, CkpvAccess(comlib_handler));
122   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
123   CkPackMessage(&env);
124
125   //Collect Multicast Statistics
126   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
127   
128   CkAssert(npes > 0);
129   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
130   
131   delete[] pelist;
132   traceUserBracketEvent(10001, start, CmiWallTimer());
133
134 }
135
136
137
138 /** 
139     Receive an incoming multicast message(sent from OneTimeMulticastStrategy::remoteMulticast).
140     Deliver the message to all local elements 
141 */
142 void OneTimeMulticastStrategy::handleMessage(void *msg){
143 #if DEBUG
144   //  CkPrintf("[%d] OneTimeMulticastStrategy::handleMessage\n", CkMyPe());
145 #endif
146   envelope *env = (envelope *)msg;
147   CkUnpackMessage(&env);
148   
149   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
150   
151   // Don't use msg after this point. Instead use the unpacked env
152   
153   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
154   
155   // Deliver to objects marked as local in the message
156   int localElems;
157   envelope *newenv;
158   CkArrayIndexMax *local_idx_list;  
159   sinfo.unpack(env, localElems, local_idx_list, newenv);
160   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
161   deliverToIndices(newmsg, localElems, local_idx_list );
162   
163   // Forward on to other processors if necessary
164   remoteMulticast(multMsg, false);
165
166 }
167
168
169
170
171 void OneTimeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
172   if(myIndex==-1){
173     // We are at a root node of the spanning tree. 
174     // We will forward the message to all other PEs in the destination list.
175     npes = totalDestPEs;
176     
177     pelist = new int[npes];
178     for (int i=0; i<npes; ++i) {
179       pelist[i] = destPEs[i].pe;
180     }
181   } else {
182     // We are at a leaf node of the spanning tree. 
183     npes = 0;
184   }
185   
186 }
187
188
189 void OneTimeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
190   const int myPe = CkMyPe();
191
192   if(myIndex == totalDestPEs-1){
193     // Final PE won't send to anyone
194     npes = 0;
195     return;
196   } else {
197     // All non-final PEs will send to next PE in list
198     npes = 1;
199     pelist = new int[1];
200     pelist[0] = destPEs[myIndex+1].pe;
201   }
202
203 }
204
205
206 void OneTimeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
207   const int myPe = CkMyPe();
208   
209   // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
210   
211   int sendLogicalIndexStart = degree*(myIndex+1) + 1;       // inclusive
212   int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
213   
214   if(sendLogicalIndexEnd-1 >= totalDestPEs){
215     sendLogicalIndexEnd = totalDestPEs;
216   }
217
218   int numSend = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
219   if(numSend <= 0){
220     npes = 0;
221     return;
222   }
223  
224 #if DEBUG
225   if(numSend > 0)
226     CkPrintf("Tree logical index %d sending to logical %d to %d (totalDestPEs excluding root=%d)  numSend=%d\n",
227              myIndex+1, sendLogicalIndexStart, sendLogicalIndexEnd, totalDestPEs, numSend);
228 #endif
229
230   npes = numSend;
231   pelist = new int[npes];
232   
233   for(int i=0;i<numSend;i++){
234     CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
235     pelist[i] = destPEs[sendLogicalIndexStart-1+i].pe;
236 #if DEBUG
237     CkPrintf("Tree logical index %d sending to PE %d\n", myIndex+1, pelist[i]);
238 #endif
239     CkAssert(pelist[i] < CkNumPes());
240   }
241   
242 }
243
244
245 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
246 int getFirstPeOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
247   int num;
248   int *nodePeList;
249   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
250   
251   for(int i=0;i<num;i++){
252     // Scan destPEs for the pe
253     int p = nodePeList[i];
254     
255     for(int j=0;j<totalDestPEs;j++){
256       if(p == destPEs[j].pe){
257         // found the representative PE for the node that is in the destPEs list
258         return p;
259       }
260     }
261   }
262   
263   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
264   return -1;
265 }
266
267
268
269 /** List all the other PEs from the list that share the physical node */
270 std::vector<int> getOtherPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
271   
272   std::vector<int> result;
273
274   int num;
275   int *nodePeList;
276   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
277   
278   for(int i=0;i<num;i++){
279     // Scan destPEs for the pe
280     int p = nodePeList[i];
281     if(p != pe){
282       for(int j=0;j<totalDestPEs;j++){
283         if(p == destPEs[j].pe){
284           // found the representative PE for the node that is in the destPEs list
285           result.push_back(p);
286           break;
287         }
288       }
289     }
290   }
291   
292   return result;
293 }
294
295
296 void OneTimeNodeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
297   const int myPe = CkMyPe();
298
299   std::set<int> nodePERepresentatives;
300   
301   // create a list of PEs, with one for each node to which the message must be sent
302   for(int i=0; i<totalDestPEs; i++){
303     int pe = destPEs[i].pe;
304     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
305     nodePERepresentatives.insert(representative);    
306   }
307   
308   int numRepresentativePEs = nodePERepresentatives.size();
309   
310   int repForMyPe=-1;
311   if(myIndex != -1)
312     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
313   
314 #if DEBUG
315   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
316   fflush(stdout);
317 #endif
318   
319   // If this PE is part of the multicast tree, then it should forward the message along
320   if(CkMyPe() == repForMyPe || myIndex == -1){
321     // I am an internal node in the multicast tree
322     
323     // flatten the data structure for nodePERepresentatives
324     int *repPeList = new int[numRepresentativePEs];
325     int myRepIndex = -1;
326     std::set<int>::iterator iter;
327     int p=0;
328     for(iter=nodePERepresentatives.begin(); iter != nodePERepresentatives.end(); iter++){
329       repPeList[p] = *iter;
330       if(*iter == repForMyPe)
331         myRepIndex = p;
332       p++;
333     }
334     CkAssert(myRepIndex >=0 || myIndex==-1);
335       
336     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
337     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
338     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
339     
340     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
341       sendLogicalIndexEnd = numRepresentativePEs;
342     }
343     
344     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
345     if(numSendTree < 0)
346       numSendTree = 0;
347     
348     std::vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
349     int numSendLocal;
350     if(myIndex == -1)
351       numSendLocal = 0;
352     else 
353       numSendLocal = otherLocalPes.size();
354     
355     
356
357 #if DEBUG
358     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
359     fflush(stdout);
360 #endif
361
362     int numSend = numSendTree + numSendLocal;
363     if(numSend <= 0){
364       npes = 0;
365       return;
366     }
367     
368     npes = numSend;
369     pelist = new int[npes];
370   
371     for(int i=0;i<numSendTree;i++){
372       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
373       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
374       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
375     }
376     
377     delete[] repPeList;
378     repPeList = NULL;
379
380     for(int i=0;i<numSendLocal;i++){
381       pelist[i+numSendTree] = otherLocalPes[i];
382       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
383     }
384     
385     
386 #if DEBUG
387     char buf[1024];
388     sprintf(buf, "PE %d is sending to PEs: ", CkMyPe() );
389     for(int i=0;i<numSend;i++){
390       sprintf(buf+strlen(buf), "%d ", pelist[i]);
391     }    
392     CkPrintf("%s\n", buf);
393     fflush(stdout);
394 #endif
395         
396   } else {
397     // We are a leaf PE
398     npes = 0;
399     return;
400   }
401
402   
403   
404 }
405
406
407
408 /*@}*/