Adding a new node aware one time multicast strategy.
[charm.git] / src / ck-com / OneTimeMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "OneTimeMulticastStrategy.h"
10 #include <string>
11 #include <set>
12
13 CkpvExtern(CkGroupID, cmgrID);
14
15 OneTimeMulticastStrategy::OneTimeMulticastStrategy()
16   : Strategy(), CharmStrategy() {
17   //  ComlibPrintf("OneTimeMulticastStrategy constructor\n");
18   setType(ARRAY_STRATEGY);
19 }
20
21 OneTimeMulticastStrategy::~OneTimeMulticastStrategy() {
22 }
23
24 void OneTimeMulticastStrategy::pup(PUP::er &p){
25   Strategy::pup(p);
26   CharmStrategy::pup(p);
27 }
28
29
30 /** Called when the user invokes the entry method on the delegated proxy. */
31 void OneTimeMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
32   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
33     CkAbort("OneTimeMulticastStrategy can only be used with an array section proxy");
34   }
35     
36   // Create a multicast message containing all information about remote destination objects 
37   int needSort = 0;
38   ComlibMulticastMsg * multMsg = sinfo.getNewMulticastMessage(cmsg, needSort, getInstance());
39   
40   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
41   localMulticast(cmsg);
42   
43   // The remote multicast method will send the message to the remote PEs, as specified in multMsg
44   remoteMulticast(multMsg, true);
45    
46   delete cmsg;    
47 }
48
49
50
51 /** Deliver the message to the local elements. */
52 void OneTimeMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
53   double start = CmiWallTimer();
54   CkSectionID *sec_id = cmsg->sec_id;
55   CkVec< CkArrayIndexMax > localIndices;
56   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
57   deliverToIndices(cmsg->getCharmMessage(), localIndices );
58   traceUserBracketEvent(10000, start, CmiWallTimer());
59 }
60
61
62
63
64
65 /** 
66     Forward multicast message to our successor processors in the spanning tree. 
67     Uses CmiSyncListSendAndFree for delivery to this strategy's OneTimeMulticastStrategy::handleMessage method.
68 */
69 void OneTimeMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE) {
70   double start = CmiWallTimer();
71
72   envelope *env = UsrToEnv(multMsg);
73     
74   int npes;
75   int *pelist;   
76   
77   /// The index into the PE list in the message
78   int myIndex = -10000; 
79   const int totalDestPEs = multMsg->nPes;
80   const int myPe = CkMyPe();
81   
82   // Find my index in the list of all destination PEs
83   if(rootPE){
84     myIndex = -1;
85   } else {
86     for (int i=0; i<totalDestPEs; ++i) {
87       if(multMsg->indicesCount[i].pe == myPe){
88         myIndex = i;
89         break;
90       }
91     }
92   }
93   
94   CkAssert(myIndex != -10000); // Sanity check
95   
96   determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes );
97   
98   if(npes == 0) {
99     traceUserBracketEvent(10001, start, CmiWallTimer());
100     CmiFree(env);
101     return;
102   }
103   
104   CmiSetHandler(env, CkpvAccess(comlib_handler));
105   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
106   CkPackMessage(&env);
107
108   //Collect Multicast Statistics
109   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
110   
111
112 #if DEBUG
113   for(int i=0;i<npes;i++){
114     CkPrintf("[%d] Multicast to %d  rootPE=%d\n", CkMyPe(), pelist[i], (int)rootPE);
115   }
116 #endif
117
118   //  if(npes > 0)
119   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
120   
121   delete[] pelist;
122   traceUserBracketEvent(10001, start, CmiWallTimer());
123
124 }
125
126
127
128 /** 
129     Receive an incoming multicast message(sent from OneTimeMulticastStrategy::remoteMulticast).
130     Deliver the message to all local elements 
131 */
132 void OneTimeMulticastStrategy::handleMessage(void *msg){
133   envelope *env = (envelope *)msg;
134   CkUnpackMessage(&env);
135   
136   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
137   
138   // Don't use msg after this point. Instead use the unpacked env
139   
140   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe());
141   
142   // Deliver to objects marked as local in the message
143   int localElems;
144   envelope *newenv;
145   CkArrayIndexMax *local_idx_list;  
146   sinfo.unpack(env, localElems, local_idx_list, newenv);
147   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
148   deliverToIndices(newmsg, localElems, local_idx_list );
149   
150   // Forward on to other processors if necessary
151   remoteMulticast( multMsg, false);
152
153 }
154
155
156
157
158 void OneTimeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
159   if(myIndex==-1){
160     // We are at a root node of the spanning tree. 
161     // We will forward the message to all other PEs in the destination list.
162     npes = totalDestPEs;
163     
164     pelist = new int[npes];
165     for (int i=0; i<npes; ++i) {
166       pelist[i] = destPEs[i].pe;
167     }
168   } else {
169     // We are at a leaf node of the spanning tree. 
170     npes = 0;
171   }
172   
173 }
174
175
176 void OneTimeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
177   const int myPe = CkMyPe();
178
179   if(myIndex == totalDestPEs-1){
180     // Final PE won't send to anyone
181     npes = 0;
182     return;
183   } else {
184     // All non-final PEs will send to next PE in list
185     npes = 1;
186     pelist = new int[1];
187     pelist[0] = destPEs[myIndex+1].pe;
188   }
189
190 }
191
192
193 void OneTimeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
194   const int myPe = CkMyPe();
195   
196   // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
197   
198   int sendLogicalIndexStart = degree*(myIndex+1) + 1;       // inclusive
199   int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
200   
201   if(sendLogicalIndexEnd-1 >= totalDestPEs){
202     sendLogicalIndexEnd = totalDestPEs;
203   }
204
205   int numSend = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
206   if(numSend <= 0){
207     npes = 0;
208     return;
209   }
210  
211 #if DEBUG
212   if(numSend > 0)
213     CkPrintf("Tree logical index %d sending to logical %d to %d (totalDestPEs excluding root=%d)  numSend=%d\n",
214              myIndex+1, sendLogicalIndexStart, sendLogicalIndexEnd, totalDestPEs, numSend);
215 #endif
216
217   npes = numSend;
218   pelist = new int[npes];
219   
220   for(int i=0;i<numSend;i++){
221     CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
222     pelist[i] = destPEs[sendLogicalIndexStart-1+i].pe;
223 #if DEBUG
224     CkPrintf("Tree logical index %d sending to PE %d\n", myIndex+1, pelist[i]);
225 #endif
226     CkAssert(pelist[i] < CkNumPes());
227   }
228   
229 }
230
231
232
233
234 void OneTimeNodeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
235   const int myPe = CkMyPe();
236
237   std::set<int> nodePERepresentatives;
238   
239   // create a list of PEs, with one for each node to which the message must be sent
240   for(int i=0; i<totalDestPEs; i++){
241     int pe = destPEs[i].pe;
242     int representative = CmiGetFirstPeOnPhysicalNode(pe);
243     nodePERepresentatives.insert(representative);    
244   }
245   
246   int numRepresentativePEs = nodePERepresentatives.size();
247
248   CkPrintf("Multicasting to %d PEs on %d physical nodes\n", totalDestPEs, numRepresentativePEs );
249   fflush(stdout);
250
251   int repForMyPe = CmiGetFirstPeOnPhysicalNode(CkMyPe());
252   
253   if(CkMyPe() == repForMyPe){
254     // This representative PE for the node should forward on this message along the tree, and deliver to local PEs
255     
256     
257     // flatten the data structure for nodePERepresentatives
258     int *repPeList = new int[numRepresentativePEs];
259     int myRepIndex = -1;
260     std::set<int>::iterator iter;
261     int p=0;
262     for(iter=nodePERepresentatives.begin(); iter != nodePERepresentatives.end(); iter++){
263       repPeList[p] = *iter;
264       if(*iter == repForMyPe)
265         myRepIndex = p;
266       p++;
267     }
268     CkAssert(myRepIndex >=0);
269     
270     
271        
272     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
273     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
274     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
275     
276     if(sendLogicalIndexEnd-1 >= totalDestPEs){
277       sendLogicalIndexEnd = totalDestPEs;
278     }
279     
280     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
281     int numSendLocal = CmiNumPesOnPhysicalNode(CkMyPe())-1;
282     
283     CkPrintf("[%d] numSendTree=%d numSendLocal=%d\n", CkMyPe(), numSendTree, numSendLocal);
284     fflush(stdout);
285
286     int numSend = numSendTree + numSendLocal;
287     if(numSend <= 0){
288       npes = 0;
289       return;
290     }
291     
292     npes = numSend;
293     pelist = new int[npes];
294   
295     for(int i=0;i<numSendTree;i++){
296       CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
297       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
298       CkAssert(pelist[i] < CkNumPes());
299     }
300     
301     int num;
302     int *pelist;
303     CmiGetPesOnPhysicalNode(CkMyPe(), &pelist, &num);
304     for(int i=0;i<numSendLocal;i++){
305       pelist[i+numSendTree] = pelist[1+i];
306     }
307
308     
309     char buf[1024];
310     sprintf(buf, "PE %d is sending to PEs: ", CkMyPe() );
311     
312     for(int i=0;i<numSend;i++){
313       sprintf(buf+strlen(buf), "%d ", pelist[i]);
314     }
315     
316     CkPrintf("%s\n", buf);
317         
318   } else {
319     // We are a leaf PE
320     npes = 0;
321     return;
322   }
323
324   
325   
326 }
327
328
329
330 /*@}*/