New version with migration and forwarding always working. The test program also works.
[charm.git] / src / ck-core / ComlibStrategy.C
1
2 #include "charm++.h"
3 #include "envelope.h"
4
5 //Class that defines the entry methods that a Charm level strategy
6 //must define.  To write a new strategy inherit from this class and
7 //define the virtual methods.  Every strategy can also define its own
8 //constructor and have any number of arguments. Also call the parent
9 //class methods in those methods.
10
11 void CharmStrategy::insertMessage(MessageHolder *mh){
12     insertMessage((CharmMessageHolder *)mh);
13 }
14
15 void CharmStrategy::pup(PUP::er &p) {
16     Strategy::pup(p);
17     p | nginfo;
18     p | ginfo;
19     p | ainfo;
20     p | forwardOnMigration;
21 }
22
23 CharmMessageHolder::CharmMessageHolder(char * msg, int proc) 
24     : MessageHolder((char *)UsrToEnv(msg), proc, 
25                     UsrToEnv(msg)->getTotalsize()){
26     
27     sec_id = NULL;    
28 }
29
30 CharmMessageHolder::~CharmMessageHolder() { 
31 }
32
33 char * CharmMessageHolder::getCharmMessage(){
34     return (char *)EnvToUsr((envelope *) data);
35 }
36
37 void CharmMessageHolder::pup(PUP::er &p) {
38
39     //    CkPrintf("In CharmMessageHolder::pup \n"); 
40
41     MessageHolder::pup(p);
42
43     //Sec ID depends on the message
44     //Currently this pup is only being used for remote messages
45     sec_id = NULL;
46 }
47
48 PUPable_def(CharmStrategy);
49 PUPable_def(CharmMessageHolder);
50
51 ComlibNodeGroupInfo::ComlibNodeGroupInfo() {
52     isNodeGroup = 0;
53     ngid.setZero();
54 };
55
56 void ComlibNodeGroupInfo::pup(PUP::er &p) {
57     p | isNodeGroup;
58     p | ngid;
59 }
60
61 ComlibGroupInfo::ComlibGroupInfo() {
62     
63     isGroup = 0;
64     nsrcpes = 0;
65     srcpelist = NULL;
66     gid.setZero();
67 };
68
69 void ComlibGroupInfo::pup(PUP::er &p){
70
71     p | gid;
72     p | nsrcpes;
73
74     if(p.isUnpacking()) {
75         if(nsrcpes >0) 
76             srcpelist = new int[nsrcpes];
77     }
78
79     if(nsrcpes > 0) 
80         p(srcpelist, nsrcpes);
81 }
82
83 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
84                                          int npes) {
85     this->gid = gid;
86     srcpelist = pelist;
87     nsrcpes = npes;
88     isGroup = 1;
89 }
90
91 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
92                                          int &npes){
93     gid = this->gid;
94
95     pelist = srcpelist;
96     npes = nsrcpes;
97 }
98
99
100 ComlibArrayInfo::ComlibArrayInfo() {
101
102     src_aid.setZero();
103     nSrcIndices = -1;
104     src_elements = NULL;
105     isSrcArray = 0;
106
107     dest_aid.setZero();
108     nDestIndices = -1;
109     dest_elements = NULL;
110     isDestArray = 0;
111 };
112
113 void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
114                                          CkArrayIndexMax *e, int nind){
115     src_aid = aid;
116     isSrcArray = 1;    
117     nSrcIndices = nind;
118     if(nind > 0) {
119         src_elements = new CkArrayIndexMax[nind];
120         memcpy(src_elements, e, sizeof(CkArrayIndexMax) * nind);
121     }
122 }
123
124
125 void ComlibArrayInfo::getSourceArray(CkArrayID &aid, 
126                                          CkArrayIndexMax *&e, int &nind){
127     aid = src_aid;
128     nind = nSrcIndices;
129     e = src_elements;
130 }
131
132
133 void ComlibArrayInfo::setDestinationArray(CkArrayID aid, 
134                                           CkArrayIndexMax *e, int nind){
135     dest_aid = aid;
136     isDestArray = 1;    
137     nDestIndices = nind;
138     if(nind > 0) {
139         dest_elements = new CkArrayIndexMax[nind];
140         memcpy(dest_elements, e, sizeof(CkArrayIndexMax) * nind);
141     }
142 }
143
144
145 void ComlibArrayInfo::getDestinationArray(CkArrayID &aid, 
146                                           CkArrayIndexMax *&e, int &nind){
147     aid = dest_aid;
148     nind = nDestIndices;
149     e = dest_elements;
150 }
151
152
153 //Each strategy must define his own Pup interface.
154 void ComlibArrayInfo::pup(PUP::er &p){ 
155     p | src_aid;
156     p | nSrcIndices;
157     p | isSrcArray;
158     
159     p | dest_aid;
160     p | nDestIndices;
161     p | isDestArray;
162     
163     if(p.isUnpacking() && nSrcIndices > 0) 
164         src_elements = new CkArrayIndexMax[nSrcIndices];
165     
166     if(p.isUnpacking() && nDestIndices > 0) 
167         dest_elements = new CkArrayIndexMax[nDestIndices];        
168     
169     if(nSrcIndices > 0)
170         p((char *)src_elements, nSrcIndices * sizeof(CkArrayIndexMax));    
171     else
172         src_elements = NULL;
173
174     if(nDestIndices > 0)
175         p((char *)dest_elements, nDestIndices * sizeof(CkArrayIndexMax));    
176     else
177         dest_elements = NULL;
178
179     //Insert all local elements into a vector
180     if(p.isUnpacking() && !dest_aid.isZero()) {
181         CkArray *dest_array = CkArrayID::CkLocalBranch(dest_aid);
182         
183         if(nDestIndices == 0){            
184             dest_array->getComlibArrayListener()->getLocalIndices
185                 (localDestIndexVec);
186         }
187         else {
188             for(int count = 0; count < nDestIndices; count++) 
189                 if(dest_array->lastKnown(dest_elements[count]) == CkMyPe())
190                     localDestIndexVec.insertAtEnd(dest_elements[count]);
191         }
192     }
193 }
194
195 //Get the list of destination processors
196 void ComlibArrayInfo::getDestinationPeList(int *&destpelist, int &ndestpes) {
197     
198     int count = 0, acount =0;
199     
200     //Destination has not been set
201     if(nDestIndices < 0) {
202         destpelist = 0;
203         ndestpes = 0;
204         return;
205     }
206
207     //Create an array of size CkNumPes()
208     //Inefficient in space
209     ndestpes = CkNumPes();
210     destpelist = new int[ndestpes];
211
212     memset(destpelist, 0, ndestpes * sizeof(int));    
213
214     if(nDestIndices == 0){
215         for(count =0; count < CkNumPes(); count ++) 
216             destpelist[count] = count;             
217         return;
218     }
219
220     ndestpes = 0;
221
222     //Find the last known processors of the array elements
223     for(acount = 0; acount < nDestIndices; acount++) {
224         int p = CkArrayID::CkLocalBranch(dest_aid)->
225             lastKnown(dest_elements[acount]);        
226         
227         for(count = 0; count < ndestpes; count ++)
228             if(destpelist[count] == p)
229                 break;
230         
231         if(count == ndestpes) {
232             destpelist[ndestpes ++] = p; 
233         }       
234     }                            
235 }
236
237 void ComlibArrayInfo::getSourcePeList(int *&srcpelist, int &nsrcpes) {
238     
239     int count = 0, acount =0;
240
241     if(nSrcIndices < 0) {
242         srcpelist = 0;
243         nsrcpes = 0;
244         return;
245     }
246
247     nsrcpes = CkNumPes();
248     srcpelist = new int[nsrcpes];
249
250     memset(srcpelist, 0, nsrcpes * sizeof(int));    
251
252     if(nSrcIndices == 0){
253         for(count =0; count < CkNumPes(); count ++) 
254             srcpelist[count] = count;             
255         return;
256     }
257
258     nsrcpes = 0;
259     for(acount = 0; acount < nSrcIndices; acount++) {
260         int p = CkArrayID::CkLocalBranch(src_aid)->
261             lastKnown(src_elements[acount]);        
262         
263         for(count = 0; count < nsrcpes; count ++)
264             if(srcpelist[count] == p)
265                 break;
266         
267         if(count == nsrcpes) {
268             srcpelist[nsrcpes ++] = p; 
269         }       
270     }                            
271 }
272
273 void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
274
275     int count = 0;        
276     pelist = 0;
277     npes = 0;
278     
279     //Both arrays empty;
280     //Sanity check, this should really not happen
281     if(nSrcIndices < 0 && nDestIndices < 0) {
282         CkAbort("Arrays have not been set\n");
283         return;
284     }
285     
286     //One of them is the entire array Hence set the number of
287     //processors to all Currently does not work for the case where
288     //number of array elements less than number of processors
289     //Will fix it later!
290     if(nSrcIndices == 0 || nDestIndices == 0) {
291         npes = CkNumPes();        
292         pelist = new int[npes];
293         for(count = 0; count < CkNumPes(); count ++) 
294             pelist[count] = count;                         
295     }
296     else {
297         getDestinationPeList(pelist, npes);
298         
299         //Destination has not been set
300         //Strategy does not care about destination
301         //Is an error case
302         if(npes == 0)
303             pelist = new int[CkNumPes()];
304         
305         //Add source processors to the destination processors
306         //already obtained
307         for(int acount = 0; acount < nSrcIndices; acount++) {
308             int p = CkArrayID::CkLocalBranch(src_aid)->
309                 lastKnown(src_elements[acount]);
310             
311             for(count = 0; count < npes; count ++)
312                 if(pelist[count] == p)
313                     break;
314             if(count == npes)
315                 pelist[npes ++] = p;
316         }                        
317     }
318 }
319
320 void ComlibArrayInfo::localBroadcast(envelope *env) {
321     ComlibArrayInfo::localMulticast(&localDestIndexVec, env);
322 }
323
324 /*
325   This method multicasts the message to all the indices in vec.  It
326   also takes care to check if the entry method is readonly or not? If
327   readonly (nokeep) the message is not copied.
328
329   It also makes sure that the entry methods are logged in projections
330   and that the array manager is notified about array element
331   migrations.  Hence this function should be used extensively in the
332   communication library strategies */
333
334 #include "register.h"
335 void ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
336                                      envelope *env){
337
338     //Multicast the messages to all elements in vec
339     int nelements = vec->size();
340     if(nelements == 0) {
341         CmiFree(env);
342         return;
343     }
344
345     void *msg = EnvToUsr(env);
346     int ep = env->getsetArrayEp();
347     CkUnpackMessage(&env);
348
349     CkArrayID dest_aid = env->getsetArrayMgr();
350     env->setPacked(0);
351     env->getsetArrayHops()=1;
352     env->setUsed(0);
353
354     for(int count = 0; count < nelements; count ++){
355         CkArrayIndexMax idx = (*vec)[count];
356         
357         //if(comm_debug) idx.print();
358
359         env->getsetArrayIndex() = idx;
360         
361         CkArray *a=(CkArray *)_localBranch(dest_aid);
362         if(_entryTable[ep]->noKeep)
363             a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CmiFalse);
364         else {
365             void *newmsg = CkCopyMsg(&msg);
366             a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue, CmiTrue);
367         }
368
369     }
370
371     CmiFree(env);
372 }
373
374 /* Delivers a message to an array element, making sure that
375    projections is notified */
376 void ComlibArrayInfo::deliver(envelope *env){
377     
378     env->setUsed(0);
379     env->getsetArrayHops()=1;
380     CkUnpackMessage(&env);
381     
382     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
383     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue, CmiTrue);    
384 }