Several new changes
[charm.git] / src / ck-com / RingMulticastStrategy.C
1
2 #include "RingMulticastStrategy.h"
3
4 //Array Constructor
5 RingMulticastStrategy::RingMulticastStrategy(CkArrayID dest_aid)
6     : DirectMulticastStrategy(dest_aid){
7 }
8
9
10 void RingMulticastStrategy::pup(PUP::er &p){
11
12     DirectMulticastStrategy::pup(p);
13 }
14
15 void RingMulticastStrategy::beginProcessing(int  nelements){
16
17     DirectMulticastStrategy::beginProcessing(nelements);
18 }
19
20 ComlibSectionHashObject *RingMulticastStrategy::createObjectOnSrcPe
21 (int nelements, CkArrayIndexMax *elements){
22
23     ComlibSectionHashObject *robj = new ComlibSectionHashObject;
24
25     int next_pe = CkNumPes();
26     int acount = 0;
27     int min_dest = CkNumPes();
28     
29     //Equivalent to sorting the list of destination processors and
30     //sending the message to the next processor greater than MyPE.
31     //If MyPE is the largest processor send it to minpe
32     for(acount = 0; acount < nelements; acount++){
33         
34         CkArrayID dest;
35         int nidx;
36         CkArrayIndexMax *idx_list;        
37         ainfo.getDestinationArray(dest, idx_list, nidx);
38
39         int p = ComlibGetLastKnown(dest, elements[acount]);
40         
41         //Find the smallest destination
42         if(p < min_dest)
43             min_dest = p;
44         
45         //If there is a processor greater than me and less than next_pe
46         //then he is my next_pe
47         if(p > CkMyPe() && next_pe > p) 
48             next_pe = p;       
49
50         if (p == CkMyPe())
51             robj->indices.insertAtEnd(elements[acount]);
52     }
53     
54     //Recycle the destination pelist and start from the begining
55     if(next_pe == CkNumPes() && min_dest != CkMyPe())        
56         next_pe = min_dest;
57     
58     if(next_pe == CkNumPes())
59         next_pe = -1;
60
61     if(next_pe != -1) {
62         robj->pelist = new int[1];
63         robj->npes = 1;
64         robj->pelist[0] = next_pe;
65     }
66     else {
67         robj->pelist = NULL;
68         robj->npes = 0;
69     }        
70     
71     return robj;
72 }
73
74
75 ComlibSectionHashObject *RingMulticastStrategy::createObjectOnIntermediatePe
76 (int nelements, CkArrayIndexMax *elements, int src_pe){
77
78     ComlibSectionHashObject *obj;
79
80     obj = createObjectOnSrcPe(nelements, elements);
81     
82     //here we check if have reached the end of the ring
83     if(obj->npes > 0 && isEndOfRing(*obj->pelist, src_pe)) {
84         delete obj->pelist;
85         obj->pelist = NULL;
86         obj->npes =0;
87     }
88
89     return obj;
90 }
91
92 //We need to end the ring, 
93 //    if next_pe is the same as the source_pe, or
94 //    if next_pe is the first processor in the ring, greater than srouce_pe.
95 //Both these comparisons are done in a 'cyclic' way with wraparounds.
96
97 int RingMulticastStrategy::isEndOfRing(int next_pe, int src_pe){
98     
99     if(next_pe < 0)
100         return 1;
101     
102     ComlibPrintf("[%d] isEndofring %d, %d\n", CkMyPe(), next_pe, src_pe);
103     
104     if(next_pe > CkMyPe()){
105         if(src_pe <= next_pe && src_pe > CkMyPe())
106             return 1;
107         
108         return 0;
109     }
110     
111     if(src_pe > CkMyPe() || src_pe <= next_pe)
112         return 1;
113     
114     return 0;
115 }