Fixing projections message tracing for [nokeep] multicast messages. Projections messa...
[charm.git] / src / ck-com / OneTimeMulticastStrategy.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    @file
5
6 */
7
8
9 #include "OneTimeMulticastStrategy.h"
10 #include <string>
11 #include <set>
12 #include <vector>
13
14 //#define DEBUG 1
15
16 CkpvExtern(CkGroupID, cmgrID);
17
18 OneTimeMulticastStrategy::OneTimeMulticastStrategy()
19   : Strategy(), CharmStrategy() {
20   //  ComlibPrintf("OneTimeMulticastStrategy constructor\n");
21   setType(ARRAY_STRATEGY);
22 }
23
24 OneTimeMulticastStrategy::~OneTimeMulticastStrategy() {
25 }
26
27 void OneTimeMulticastStrategy::pup(PUP::er &p){
28   Strategy::pup(p);
29   CharmStrategy::pup(p);
30 }
31
32
33 /** Called when the user invokes the entry method on the delegated proxy. */
34 void OneTimeMulticastStrategy::insertMessage(CharmMessageHolder *cmsg){
35 #if DEBUG
36   CkPrintf("[%d] OneTimeMulticastStrategy::insertMessage\n", CkMyPe());
37   fflush(stdout);
38 #endif 
39
40   if(cmsg->dest_proc != IS_SECTION_MULTICAST && cmsg->sec_id == NULL) { 
41     CkAbort("OneTimeMulticastStrategy can only be used with an array section proxy");
42   }
43     
44
45   envelope *env = UsrToEnv(cmsg->getCharmMessage());
46   int npes = 1;
47   int pes[1] = {0};
48   _TRACE_CREATION_MULTICAST(env, npes, pes);
49
50 #if DEBUG
51   CkPrintf("[%d] after TRACE_CREATION_MULTICAST menv->event=%d\n", CkMyPe(), (int)env->getEvent());  
52 #endif
53   
54   // Create a multicast message containing all information about remote destination objects 
55   ComlibMulticastMsg * multMsg = sinfo.getNewMulticastMessage(cmsg, 0, getInstance());
56
57
58 #if DEBUG
59     CkPrintf("[%d] after TRACE_CREATION_MULTICAST multMsg->event=%d\n", CkMyPe(), (int)( UsrToEnv(multMsg)->getEvent() ) );  
60 #endif
61
62   // The remote multicast method will send the message to the remote PEs, as specified in multMsg
63   remoteMulticast(multMsg, true);
64
65   // local multicast will re-extract a list of local destination objects (FIXME to make this more efficient)
66   localMulticast(cmsg);
67
68   delete cmsg;    
69 }
70
71
72
73 /** Deliver the message to the local elements. */
74 void OneTimeMulticastStrategy::localMulticast(CharmMessageHolder *cmsg) {
75   double start = CmiWallTimer();
76   CkSectionID *sec_id = cmsg->sec_id;
77   CkVec< CkArrayIndexMax > localIndices;
78   sinfo.getLocalIndices(sec_id->_nElems, sec_id->_elems, sec_id->_cookie.aid, localIndices);
79   deliverToIndices(cmsg->getCharmMessage(), localIndices );
80   traceUserBracketEvent(10000, start, CmiWallTimer());
81 }
82
83
84
85
86
87 /** 
88     Forward multicast message to our successor processors in the spanning tree. 
89     Uses CmiSyncListSendAndFree for delivery to this strategy's OneTimeMulticastStrategy::handleMessage method.
90 */
91 void OneTimeMulticastStrategy::remoteMulticast(ComlibMulticastMsg * multMsg, bool rootPE) {
92   double start = CmiWallTimer();
93
94   envelope *env = UsrToEnv(multMsg);
95     
96   
97   /// The index into the PE list in the message
98   int myIndex = -10000; 
99   const int totalDestPEs = multMsg->nPes;
100   const int myPe = CkMyPe();
101   
102   // Find my index in the list of all destination PEs
103   if(rootPE){
104     myIndex = -1;
105   } else {
106     for (int i=0; i<totalDestPEs; ++i) {
107       if(multMsg->indicesCount[i].pe == myPe){
108         myIndex = i;
109         break;
110       }
111     }
112   }
113   
114   if(myIndex == -10000)
115     CkAbort("My PE was not found in the list of destination PEs in the ComlibMulticastMsg");
116   
117   int npes;
118   int *pelist = NULL;
119
120   if(totalDestPEs > 0)
121     determineNextHopPEs(totalDestPEs, multMsg->indicesCount, myIndex, pelist, npes );
122   else {
123     npes = 0;
124   }
125
126   if(npes == 0) {
127 #if DEBUG
128     CkPrintf("[%d] OneTimeMulticastStrategy::remoteMulticast is not forwarding to any other PEs\n", CkMyPe());
129 #endif
130     traceUserBracketEvent(10001, start, CmiWallTimer());
131     CmiFree(env);
132     return;
133   }
134   
135   //Collect Multicast Statistics
136   RECORD_SENDM_STATS(getInstance(), env->getTotalsize(), pelist, npes);
137   
138
139   CmiSetHandler(env, CkpvAccess(comlib_handler));
140   ((CmiMsgHeaderBasic *) env)->stratid = getInstance();  
141   CkPackMessage(&env);
142
143   double middle = CmiWallTimer();
144
145   
146   // CkPrintf("[%d] before CmiSyncListSendAndFree env->event=%d\n", CkMyPe(), (int)env->getEvent());
147
148
149   CkAssert(npes > 0);
150   CmiSyncListSendAndFree(npes, pelist, env->getTotalsize(), (char*)env);
151   
152   delete[] pelist;
153
154   double end = CmiWallTimer();
155   traceUserBracketEvent(10001, start, middle);
156   traceUserBracketEvent(10002, middle, end);
157   
158 }
159
160
161
162 /** 
163     Receive an incoming multicast message(sent from OneTimeMulticastStrategy::remoteMulticast).
164     Deliver the message to all local elements 
165 */
166 void OneTimeMulticastStrategy::handleMessage(void *msg){
167 #if DEBUG
168   //  CkPrintf("[%d] OneTimeMulticastStrategy::handleMessage\n", CkMyPe());
169 #endif
170   envelope *env = (envelope *)msg;
171   // CkPrintf("[%d] in OneTimeMulticastStrategy::handleMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
172   
173   CkUnpackMessage(&env);
174   
175   // CkPrintf("[%d] in OneTimeMulticastStrategy::handleMessage after CkUnpackMessage env->event=%d\n", CkMyPe(), (int)env->getEvent());
176   
177
178
179   ComlibMulticastMsg* multMsg = (ComlibMulticastMsg*)EnvToUsr(env);
180   
181   // Don't use msg after this point. Instead use the unpacked env
182   
183   RECORD_RECV_STATS(getInstance(), env->getTotalsize(), env->getSrcPe()); // DOESN'T DO ANYTHING IN NEW COMLIB
184   
185   // Deliver to objects marked as local in the message
186   int localElems;
187   envelope *newenv;
188   CkArrayIndexMax *local_idx_list;  
189   sinfo.unpack(env, localElems, local_idx_list, newenv);
190   ComlibMulticastMsg *newmsg = (ComlibMulticastMsg *)EnvToUsr(newenv);  
191
192   //  CkPrintf("[%d] in OneTimeMulticastStrategy::handleMessage before  deliverToIndices newenv->event=%d\n", CkMyPe(), (int)newenv->getEvent());
193
194   deliverToIndices(newmsg, localElems, local_idx_list );
195   
196   // Forward on to other processors if necessary
197   remoteMulticast(multMsg, false);
198
199 }
200
201
202
203
204 void OneTimeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
205   if(myIndex==-1){
206     // We are at a root node of the spanning tree. 
207     // We will forward the message to all other PEs in the destination list.
208     npes = totalDestPEs;
209     
210     pelist = new int[npes];
211     for (int i=0; i<npes; ++i) {
212       pelist[i] = destPEs[i].pe;
213     }
214   } else {
215     // We are at a leaf node of the spanning tree. 
216     npes = 0;
217   }
218   
219 }
220
221
222 void OneTimeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes) {
223   const int myPe = CkMyPe();
224
225   if(myIndex == totalDestPEs-1){
226     // Final PE won't send to anyone
227     npes = 0;
228     return;
229   } else {
230     // All non-final PEs will send to next PE in list
231     npes = 1;
232     pelist = new int[1];
233     pelist[0] = destPEs[myIndex+1].pe;
234   }
235
236 }
237
238
239 void OneTimeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
240   const int myPe = CkMyPe();
241   
242   // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
243   
244   int sendLogicalIndexStart = degree*(myIndex+1) + 1;       // inclusive
245   int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
246   
247   if(sendLogicalIndexEnd-1 >= totalDestPEs){
248     sendLogicalIndexEnd = totalDestPEs;
249   }
250
251   int numSend = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
252   if(numSend <= 0){
253     npes = 0;
254     return;
255   }
256  
257 #if DEBUG
258   if(numSend > 0)
259     CkPrintf("Tree logical index %d sending to logical %d to %d (totalDestPEs excluding root=%d)  numSend=%d\n",
260              myIndex+1, sendLogicalIndexStart, sendLogicalIndexEnd, totalDestPEs, numSend);
261 #endif
262
263   npes = numSend;
264   pelist = new int[npes];
265   
266   for(int i=0;i<numSend;i++){
267     CkAssert(sendLogicalIndexStart-1+i < totalDestPEs);
268     pelist[i] = destPEs[sendLogicalIndexStart-1+i].pe;
269 #if DEBUG
270     CkPrintf("Tree logical index %d sending to PE %d\n", myIndex+1, pelist[i]);
271 #endif
272     CkAssert(pelist[i] < CkNumPes());
273   }
274   
275 }
276
277
278 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
279 int getFirstPeOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
280   int num;
281   int *nodePeList;
282   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
283   
284   for(int i=0;i<num;i++){
285     // Scan destPEs for the pe
286     int p = nodePeList[i];
287     
288     for(int j=0;j<totalDestPEs;j++){
289       if(p == destPEs[j].pe){
290         // found the representative PE for the node that is in the destPEs list
291         return p;
292       }
293     }
294   }
295   
296   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
297   return -1;
298 }
299
300
301 /** Find a unique representative PE for a node containing pe, with the restriction that the returned PE is in the list destPEs. */
302 int getNthPeOnPhysicalNodeFromList(int n, int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
303   int num;
304   int *nodePeList;
305   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
306   
307   int count = 0;
308   int lastFound = -1;
309   
310   // Foreach PE on this physical node
311   for(int i=0;i<num;i++){
312     int p = nodePeList[i];
313     
314     // Scan destPEs for the pe
315     for(int j=0;j<totalDestPEs;j++){
316       if(p == destPEs[j].pe){
317         lastFound = p;
318         if(count==n)
319           return p;
320         count++;
321       }
322     }
323   }
324   
325   if(lastFound != -1)
326     return lastFound;
327
328   CkAbort("ERROR: Could not find an entry for pe in destPEs list.\n");
329   return -1;
330 }
331
332
333 /** List all the PEs from the list that share the physical node */ 
334 std::vector<int> getPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){ 
335    
336   std::vector<int> result; 
337  
338   int num; 
339   int *nodePeList; 
340   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num); 
341   
342   for(int i=0;i<num;i++){ 
343     // Scan destPEs for the pe 
344     int p = nodePeList[i]; 
345     for(int j=0;j<totalDestPEs;j++){ 
346       if(p == destPEs[j].pe){ 
347         // found the representative PE for the node that is in the
348         // destPEs list 
349         result.push_back(p); 
350         break; 
351       } 
352     } 
353   } 
354   
355   return result; 
356 }
357
358
359
360 /** List all the other PEs from the list that share the physical node */
361 std::vector<int> getOtherPesOnPhysicalNodeFromList(int pe, const int totalDestPEs, const ComlibMulticastIndexCount* destPEs){
362   
363   std::vector<int> result;
364
365   int num;
366   int *nodePeList;
367   CmiGetPesOnPhysicalNode(pe, &nodePeList, &num);
368   
369   for(int i=0;i<num;i++){
370     // Scan destPEs for the pe
371     int p = nodePeList[i];
372     if(p != pe){
373       for(int j=0;j<totalDestPEs;j++){
374         if(p == destPEs[j].pe){
375           // found the representative PE for the node that is in the destPEs list
376           result.push_back(p);
377           break;
378         }
379       }
380     }
381   }
382   
383   return result;
384 }
385
386
387 void OneTimeNodeTreeMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
388   const int myPe = CkMyPe();
389
390   std::set<int> nodePERepresentatives;
391   
392   // create a list of PEs, with one for each node to which the message must be sent
393   for(int i=0; i<totalDestPEs; i++){
394     int pe = destPEs[i].pe;
395     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
396     nodePERepresentatives.insert(representative);    
397   }
398   
399   int numRepresentativePEs = nodePERepresentatives.size();
400   
401   int repForMyPe=-1;
402   if(myIndex != -1)
403     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
404   
405 #if DEBUG
406   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
407   fflush(stdout);
408 #endif
409   
410   // If this PE is part of the multicast tree, then it should forward the message along
411   if(CkMyPe() == repForMyPe || myIndex == -1){
412     // I am an internal node in the multicast tree
413     
414     // flatten the data structure for nodePERepresentatives
415     int *repPeList = new int[numRepresentativePEs];
416     int myRepIndex = -1;
417     std::set<int>::iterator iter;
418     int p=0;
419     for(iter=nodePERepresentatives.begin(); iter != nodePERepresentatives.end(); iter++){
420       repPeList[p] = *iter;
421       if(*iter == repForMyPe)
422         myRepIndex = p;
423       p++;
424     }
425     CkAssert(myRepIndex >=0 || myIndex==-1);
426       
427     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
428     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
429     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
430     
431     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
432       sendLogicalIndexEnd = numRepresentativePEs;
433     }
434     
435     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
436     if(numSendTree < 0)
437       numSendTree = 0;
438     
439     std::vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
440     int numSendLocal;
441     if(myIndex == -1)
442       numSendLocal = 0;
443     else 
444       numSendLocal = otherLocalPes.size();
445     
446     
447
448 #if DEBUG
449     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
450     fflush(stdout);
451 #endif
452
453     int numSend = numSendTree + numSendLocal;
454     if(numSend <= 0){
455       npes = 0;
456       return;
457     }
458     
459     npes = numSend;
460     pelist = new int[npes];
461   
462     for(int i=0;i<numSendTree;i++){
463       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
464       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
465       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
466     }
467     
468     delete[] repPeList;
469     repPeList = NULL;
470
471     for(int i=0;i<numSendLocal;i++){
472       pelist[i+numSendTree] = otherLocalPes[i];
473       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
474     }
475     
476     
477 #if DEBUG
478     char buf[1024];
479     sprintf(buf, "PE %d is sending to Remote Node PEs: ", CkMyPe() );
480     for(int i=0;i<numSend;i++){
481       if(i==numSendTree)
482         sprintf(buf+strlen(buf), " and Local To Node PEs: ", pelist[i]);
483
484       sprintf(buf+strlen(buf), "%d ", pelist[i]);
485     }    
486     CkPrintf("%s\n", buf);
487     fflush(stdout);
488 #endif
489         
490   } else {
491     // We are a leaf PE
492     npes = 0;
493     return;
494   }
495
496   
497   
498 }
499
500
501 void OneTimeNodeTreeRingMulticastStrategy::determineNextHopPEs(const int totalDestPEs, const ComlibMulticastIndexCount* destPEs, const int myIndex, int * &pelist, int &npes){
502   const int myPe = CkMyPe();
503
504   std::set<int> nodePERepresentatives;
505   
506   // create a list of PEs, with one for each node to which the message must be sent
507   for(int i=0; i<totalDestPEs; i++){
508     int pe = destPEs[i].pe;
509     int representative = getFirstPeOnPhysicalNodeFromList(pe, totalDestPEs, destPEs);
510     nodePERepresentatives.insert(representative);    
511   }
512   
513   int numRepresentativePEs = nodePERepresentatives.size();
514   
515   int repForMyPe=-1;
516   if(myIndex != -1)
517     repForMyPe = getFirstPeOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
518   
519 #if DEBUG
520   CkPrintf("[%d] Multicasting to %d PEs on %d physical nodes  repForMyPe=%d\n", CkMyPe(), totalDestPEs, numRepresentativePEs, repForMyPe);
521   fflush(stdout);
522 #endif
523   
524   // If this PE is part of the multicast tree, then it should forward the message along
525   if(CkMyPe() == repForMyPe || myIndex == -1){
526     // I am an internal node in the multicast tree
527     
528     // flatten the data structure for nodePERepresentatives
529     int *repPeList = new int[numRepresentativePEs];
530     int myRepIndex = -1;
531     std::set<int>::iterator iter;
532     int p=0;
533     for(iter=nodePERepresentatives.begin(); iter != nodePERepresentatives.end(); iter++){
534       repPeList[p] = *iter;
535       if(*iter == repForMyPe)
536         myRepIndex = p;
537       p++;
538     }
539     CkAssert(myRepIndex >=0 || myIndex==-1);
540       
541     // The logical indices start at 0 = root node. Logical index i corresponds to the entry i+1 in the array of PEs in the message
542     int sendLogicalIndexStart = degree*(myRepIndex+1) + 1;       // inclusive
543     int sendLogicalIndexEnd = sendLogicalIndexStart + degree - 1;   // inclusive
544     
545     if(sendLogicalIndexEnd-1 >= numRepresentativePEs){
546       sendLogicalIndexEnd = numRepresentativePEs;
547     }
548     
549     int numSendTree = sendLogicalIndexEnd - sendLogicalIndexStart + 1;
550     if(numSendTree < 0)
551       numSendTree = 0;
552
553
554     // Send in a ring to the PEs on this node
555     std::vector<int> otherLocalPes = getOtherPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
556     int numSendLocal = 0;
557     if(myIndex == -1)
558       numSendLocal = 0;
559     else {
560       if(otherLocalPes.size() > 0)
561         numSendLocal = 1;
562       else
563         numSendLocal = 0;
564     }
565     
566
567 #if DEBUG
568     CkPrintf("[%d] numSendTree=%d numSendLocal=%d sendLogicalIndexStart=%d sendLogicalIndexEnd=%d\n", CkMyPe(), numSendTree, numSendLocal,  sendLogicalIndexStart, sendLogicalIndexEnd);
569     fflush(stdout);
570 #endif
571
572     int numSend = numSendTree + numSendLocal;
573     if(numSend <= 0){
574       npes = 0;
575       return;
576     }
577     
578     npes = numSend;
579     pelist = new int[npes];
580   
581     for(int i=0;i<numSendTree;i++){
582       CkAssert(sendLogicalIndexStart-1+i < numRepresentativePEs);
583       pelist[i] = repPeList[sendLogicalIndexStart-1+i];
584       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
585     }
586     
587     delete[] repPeList;
588     repPeList = NULL;
589
590     for(int i=0;i<numSendLocal;i++){
591       pelist[i+numSendTree] = otherLocalPes[i];
592       CkAssert(pelist[i] < CkNumPes() && pelist[i] >= 0);
593     }
594     
595     
596 #if DEBUG
597     char buf[1024];
598     sprintf(buf, "PE %d is sending to Remote Node PEs: ", CkMyPe() );
599     for(int i=0;i<numSend;i++){
600       if(i==numSendTree)
601         sprintf(buf+strlen(buf), " and Local To Node PEs: ", pelist[i]);
602
603       sprintf(buf+strlen(buf), "%d ", pelist[i]);
604     }    
605     CkPrintf("%s\n", buf);
606     fflush(stdout);
607 #endif
608         
609   } else {
610     // We are a leaf PE, so forward in a ring to the PEs on this node
611     const std::vector<int> otherLocalPes = getPesOnPhysicalNodeFromList(CkMyPe(), totalDestPEs, destPEs);
612     
613     npes = 0;
614     pelist = new int[1];
615     
616     //    CkPrintf("[%d] otherLocalPes.size=%d\n", CkMyPe(), otherLocalPes.size() ); 
617     const int numOthers = otherLocalPes.size() ;
618     
619     for(int i=0;i<numOthers;i++){
620       if(otherLocalPes[i] == CkMyPe()){
621         // found me in the PE list for this node
622         if(i+1<otherLocalPes.size()){
623           // If we have a successor in the ring
624           pelist[0] = otherLocalPes[i+1];
625           npes = 1;
626         }
627       }
628     }
629     
630     
631 #if DEBUG
632     if(npes==0)
633       CkPrintf("[%d] At end of ring\n", CkMyPe() );
634     else
635       CkPrintf("[%d] sending along ring to %d\n", CkMyPe(), pelist[0] );
636     
637     fflush(stdout);
638 #endif
639     
640     
641   }
642   
643   
644   
645 }
646
647 /*@}*/