merging with main branch
[charm.git] / src / ck-com / MultiRingMulticast.C
1 /**
2    @addtogroup ComlibCharmStrategy
3    @{
4    
5    @file 
6 */
7
8 #include "MultiRingMulticast.h"
9
10
11 //Unlike ring the source here sends two or more messages while all
12 //elements along the ring only send one.
13
14 void MultiRingMulticastStrategy::createObjectOnSrcPe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *pelist) {
15
16   obj->npes = 0;
17   obj->pelist = 0;
18
19   if(npes == 0) return;
20
21   if(npes < 4) {
22     // direct sending, take out ourself from the list!
23     obj->npes = npes;
24     for (int i=0; i<npes; ++i) if (pelist[i].pe == CkMyPe()) obj->npes --;
25     obj->pelist = new int[obj->npes];
26     for (int i=0, count=0; i<npes; ++i) {
27       if (pelist[i].pe != CkMyPe()) {
28         obj->pelist[count] = pelist[i].pe;
29         count++;
30       }
31     }
32     return;
33   }
34     
35   int myid = -1; // getMyId(pelist, npes, CkMyPe());    
36   for (int i=0; i<npes; ++i) {
37     if (pelist[i].pe == CkMyPe()) {
38       myid = i;
39       break;
40     }
41   }
42
43   int breaking = npes/2; /* 0 : breaking-1    is the first ring
44                             breaking : npes-1 is the second ring
45                          */
46
47   int next_id = myid + 1;
48   // wrap nextpe around the ring
49   if(myid < breaking) {
50     if (next_id >= breaking) next_id = 0;
51   } else {
52     if (next_id >= npes) next_id = breaking;
53   }
54     
55   int mid_id;
56   if (myid < breaking) {
57     mid_id = myid + breaking;
58     if (mid_id < breaking) mid_id = breaking;
59   } else {
60     mid_id = myid - breaking;
61     if (mid_id >= breaking) mid_id = 0;
62   }
63   //mid_pe = getMidPe(pelist, npes, CkMyPe());
64     
65   if(pelist[next_id].pe != CkMyPe()) {
66     obj->pelist = new int[2];
67     obj->npes = 2;
68         
69     obj->pelist[0] = pelist[next_id].pe;
70     obj->pelist[1] = pelist[mid_id].pe;
71   }
72   else {
73     CkAbort("Warning Should not be here !!!!!!!!!\n");
74   }
75   
76   //CkPrintf("%d Src = %d Next = %d Mid Pe =%d\n", CkMyPe(), CkMyPe(), pelist[next_id], pelist[mid_id]);
77   
78   return;
79 }
80
81 void MultiRingMulticastStrategy::createObjectOnIntermediatePe(ComlibSectionHashObject *obj, int npes, ComlibMulticastIndexCount *counts, int srcpe) {
82
83   obj->pelist = 0;
84   obj->npes = 0;
85
86   if(npes < 4) return;
87
88   // where are we inside the list?
89   int myid = -1;
90   for (int i=0; i<npes; ++i) {
91     if (counts[i].pe == CkMyPe()) {
92       myid = i;
93       break;
94     }
95   }
96
97   // we must be in the list!
98   CkAssert(myid >= 0 && myid < npes);
99
100   int breaking = npes/2;
101   int srcid = 0;
102   for (int i=0; i<npes; ++i) {
103     if (counts[i].pe == srcpe) {
104       srcid = i;
105       break;
106     }
107   }
108
109   if (srcid < breaking ^ myid < breaking) {
110     // if we are in the two different halves, correct srcid
111     if (srcid < breaking) {
112       srcid += breaking;
113       if (srcid < breaking) srcid = breaking;
114     } else {
115       srcid -= breaking;
116       if (srcid >= breaking) srcid = 0;
117     }
118   }
119   // now srcid is the starting point of this half ring, which could be the
120   // original sender itself (0 if the sender is not part of the recipients),
121   // or the counterpart in the other ring
122
123   int nextid = myid + 1;
124   // wrap nextpe around the ring
125   if(myid < breaking) {
126     if (nextid >= breaking) nextid = 0;
127   }
128   else {
129     if (nextid >= npes) nextid = breaking;
130   }
131
132   if (nextid != srcid) {
133     obj->pelist = new int[1];
134     obj->npes = 1;
135     obj->pelist[0] = counts[nextid].pe;
136   }
137
138   return;
139 }
140
141 /*@}*/