comlib: Fix non-initialized cookie that breaks multicast strategies
[charm.git] / src / ck-com / ComlibStrategy.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementations of ComlibStrategy.h
6 */
7
8 #include "ComlibStrategy.h"
9 #include "register.h"
10
11
12 void CharmStrategy::pup(PUP::er &p) {
13   //Strategy::pup(p);
14     p | nginfo;
15     p | ginfo;
16     p | ainfo;
17     //p | forwardOnMigration;
18     p | mflag;
19     p | onFinish;
20 }
21
22
23
24
25
26 /** 
27     deliver a message to a set of indices using the array manager. Indices can be local or remote. 
28     
29     An optimization for [nokeep] methods is applied: the message is not copied for each invocation.
30    
31     @return the number of destination objects which were not local (information
32     retrieved from the array/location manager)
33 */
34 int CharmStrategy::deliverToIndices(void *msg, int numDestIdxs, const CkArrayIndexMax* indices ){
35   int count = 0;
36   
37   envelope *env = UsrToEnv(msg);
38   int ep = env->getsetArrayEp();
39   CkUnpackMessage(&env);
40
41   CkArrayID destination_aid = env->getsetArrayMgr();
42   CkArray *a=(CkArray *)_localBranch(destination_aid);
43
44   env->setPacked(0);
45   env->getsetArrayHops()=1;
46   env->setUsed(0);
47
48   //  CkPrintf("Delivering to %d objects\n", numDestIdxs);
49
50   if(numDestIdxs > 0){
51         
52     // SEND to all destination objects except the last one
53     for(int i=0; i<numDestIdxs-1;i++){
54       env->getsetArrayIndex() = indices[i];
55       
56       //      CkPrintf("[%d] in deliverToIndices env->event=%d pe=%d\n", CkMyPe(), (int)env->getEvent(), (int)env->getSrcPe());
57
58       if(_entryTable[ep]->noKeep)
59         // don't make a copy for [nokeep] entry methods
60         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
61       else {
62         void *newmsg = CkCopyMsg(&msg);
63         count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
64       }
65     }
66     
67     // SEND to the final destination object
68     env->getsetArrayIndex() = indices[numDestIdxs-1];
69     
70     if(_entryTable[ep]->noKeep){
71       count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
72       CmiFree(env); // runtime frees the [nokeep] messages
73     }
74     else {
75       count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
76     }
77     
78   }
79   else
80     CkFreeMsg(msg);
81
82   return count;
83 }
84
85
86
87
88
89
90
91
92 void CharmMessageHolder::pup(PUP::er &p) {
93
94     //    CkPrintf("In CharmMessageHolder::pup \n"); 
95
96     MessageHolder::pup(p);
97
98     //Sec ID depends on the message
99     //Currently this pup is only being used for remote messages
100     sec_id = NULL;
101 }
102
103 //PUPable_def(CharmStrategy);
104 PUPable_def(CharmMessageHolder)
105
106 ComlibNodeGroupInfo::ComlibNodeGroupInfo() {
107     isNodeGroup = 0;
108     ngid.setZero();
109 }
110
111 void ComlibNodeGroupInfo::pup(PUP::er &p) {
112     p | isNodeGroup;
113     p | ngid;
114 }
115
116 ComlibGroupInfo::ComlibGroupInfo() {
117     
118     isSrcGroup = 0;
119     isDestGroup = 0;
120     nsrcpes = 0;
121     ndestpes = 0;
122     srcpelist = NULL;
123     destpelist = NULL;
124     sgid.setZero();
125     dgid.setZero();
126 }
127
128 ComlibGroupInfo::~ComlibGroupInfo() {
129     if(nsrcpes > 0 && srcpelist != NULL)
130         delete [] srcpelist;
131
132     if(ndestpes > 0 && destpelist != NULL)
133         delete [] destpelist;
134 }
135
136 void ComlibGroupInfo::pup(PUP::er &p){
137
138     p | sgid;
139     p | dgid;
140     p | nsrcpes;
141     p | ndestpes;
142
143     p | isSrcGroup;
144     p | isDestGroup;
145
146     if(p.isUnpacking()) {
147         if(nsrcpes > 0) 
148             srcpelist = new int[nsrcpes];
149
150         if(ndestpes > 0) 
151             destpelist = new int[ndestpes];
152     }
153
154     if(nsrcpes > 0) 
155         p(srcpelist, nsrcpes);
156
157     if(ndestpes > 0) 
158         p(destpelist, ndestpes);
159 }
160
161 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
162                                          int npes) {
163     this->sgid = gid;
164     srcpelist = pelist;
165     nsrcpes = npes;
166     isSrcGroup = 1;
167
168     if(nsrcpes == 0) {
169         nsrcpes = CkNumPes();
170         srcpelist = new int[nsrcpes];
171         for(int count =0; count < nsrcpes; count ++)
172             srcpelist[count] = count;
173     }
174 }
175
176 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
177                                          int &npes){
178     gid = this->sgid;
179     npes = nsrcpes;
180
181     pelist = new int [nsrcpes];
182     memcpy(pelist, srcpelist, npes * sizeof(int));
183 }
184
185 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid){
186     gid = this->sgid;
187 }
188
189 void ComlibGroupInfo::setDestinationGroup(CkGroupID gid, int *pelist, 
190                                          int npes) {
191     this->dgid = gid;
192     destpelist = pelist;
193     ndestpes = npes;
194     isDestGroup = 1;
195
196     if(ndestpes == 0) {
197         ndestpes = CkNumPes();
198         destpelist = new int[ndestpes];
199         for(int count =0; count < ndestpes; count ++)
200             destpelist[count] = count;
201     }
202 }
203
204 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid, int *&pelist, 
205                                          int &npes) {
206     gid = this->dgid;
207     npes = ndestpes;
208
209     pelist = new int [ndestpes];
210     memcpy(pelist, destpelist, npes * sizeof(int));
211 }
212
213 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid) {
214     gid = this->dgid;
215 }
216
217 int *ComlibGroupInfo::getCombinedCountList() {
218   int *result = new int[CkNumPes()];
219   int i;
220   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
221   if (nsrcpes != 0) {
222     for (i=0; i<nsrcpes; ++i) result[srcpelist[i]] |= 1;
223   } else {
224     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
225   }
226   if (ndestpes != 0) {
227     for (i=0; i<ndestpes; ++i) result[destpelist[i]] |= 2;
228   } else {
229     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
230   }
231   return result;
232 }
233
234
235 ComlibArrayInfo::ComlibArrayInfo() {
236         
237     src_aid.setZero();
238     isAllSrc = 0;
239     totalSrc = 0;
240     isSrcArray = 0;
241
242     dest_aid.setZero();
243     isAllDest = 0;
244     totalDest = 0;
245     isDestArray = 0;
246 }
247
248
249 void ComlibArrayInfo::setSourceArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
250     src_aid = aid;
251     isSrcArray = 1;
252
253     src_elements.removeAll();
254     for (int i=0; i<nind; ++i){
255       CkAssert(e[i].nInts == 1);
256       src_elements.push_back(e[i]);
257     }
258     
259     if (nind == 0) 
260         isAllSrc = 1;
261     else 
262         isAllSrc = 0;
263
264     totalSrc = nind;
265
266     CkAssert(src_elements.size() == totalSrc);    
267
268 }
269
270
271 void ComlibArrayInfo::setDestinationArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
272   ComlibPrintf("[%d] ComlibArrayInfo::setDestinationArray  dest_elements\n", CkMyPe());
273     dest_aid = aid;
274     isDestArray = 1;
275    
276     dest_elements.removeAll();
277     for (int i=0; i<nind; ++i){
278       CkAssert(e[i].nInts > 0);
279       dest_elements.push_back(e[i]);
280     }
281
282     if (nind == 0) 
283         isAllDest = 1;
284     else 
285         isAllDest = 0;
286     
287     totalDest = nind;
288     CkAssert(dest_elements.size() == totalDest);    
289
290 }
291
292
293 /// @TODO fix the pup!
294 //Each strategy must define his own Pup interface.
295 void ComlibArrayInfo::pup(PUP::er &p){ 
296     p | src_aid;
297     p | isSrcArray;
298     p | isAllSrc;
299     p | totalSrc;
300     p | src_elements; 
301     p | new_src_elements;
302
303     p | dest_aid;
304     p | isDestArray;
305     p | isAllDest;
306     p | totalDest;
307     p | dest_elements; 
308     p | new_dest_elements;
309
310     if (p.isPacking() || p.isUnpacking()) {
311       // calling purge both during packing (at the end) and during unpacking
312       // allows this code to be executed both on processor 0 (where the object
313       // is created) and on every other processor where it arrives through PUP.
314       purge();
315     }
316
317     
318 }
319
320
321 void ComlibArrayInfo::printDestElementList() {
322   char buf[100000];
323   buf[0] = '\0';
324   for(int i=0;i<dest_elements.size();i++){
325     sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
326   }
327   CkPrintf("[%d] dest_elements = %s\n", CkMyPe(), buf);
328 }
329
330
331 void ComlibArrayInfo::newElement(CkArrayID &id, const CkArrayIndex &idx) {
332   CkAbort("New Comlib implementation does not allow dynamic element insertion yet\n");
333   //  CkPrintf("ComlibArrayInfo::newElement dest_elements\n");
334   //  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
335   //  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
336 }
337
338 void ComlibArrayInfo::purge() {
339   //    ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
340
341   if (isSrcArray) {
342     CkArray *a = (CkArray *)_localBranch(src_aid);
343
344     // delete all the source elements for which we are not homePe
345     for (int i=src_elements.size()-1; i>=0; --i) {
346       if (a->homePe(src_elements[i]) != CkMyPe()) {                     
347         ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
348         src_elements.remove(i); 
349       }
350     }
351   }
352
353   if (isDestArray) {
354     CkArray *a = (CkArray *)_localBranch(dest_aid);
355         
356     // delete all the destination elements for which we are not homePe
357     for (int i=dest_elements.size()-1; i>=0; --i) {
358       if (a->homePe(dest_elements[i]) != CkMyPe()) {
359         ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
360         dest_elements.remove(i); 
361       }
362     }           
363   }
364
365 }
366
367 int *ComlibArrayInfo::getCombinedCountList() {
368   int *result = new int[CkNumPes()];
369   int i;
370   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
371   CkArray *a = (CkArray *)_localBranch(src_aid);
372   if (src_elements.size() != 0) {
373     for (i=0; i<src_elements.size(); ++i) result[a->homePe(src_elements[i])] |= 1;
374   } else {
375     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
376   }
377   a = (CkArray *)_localBranch(dest_aid);
378   if (dest_elements.size() != 0) {
379     for (i=0; i<dest_elements.size(); ++i) result[a->homePe(dest_elements[i])] |= 2;
380   } else {
381     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
382   }
383   return result;
384 }
385
386
387
388 /**  Broadcast the message to all local elements (as listed in dest_elements) */
389 void ComlibArrayInfo::localBroadcast(envelope *env) {
390   int count = localMulticast(&dest_elements, env);
391   if(com_debug){
392     CkPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
393     printDestElementList();
394   }
395
396 }
397
398
399
400 /**
401   This method multicasts the message to all the indices in vec.  It
402   also takes care to check if the entry method is readonly or not. If
403   readonly (nokeep) the message is not copied.
404
405   It also makes sure that the entry methods are logged in projections
406   and that the array manager is notified about array element
407   migrations.  Hence this function should be used extensively in the
408   communication library strategies
409
410   This method is more general than just ComlibArrayInfo dest_aid since it takes
411   the destination array id directly form the message envelope.
412
413   @return the number of destination objects which were not local (information
414   retrieved from the array/location manager)
415
416   @todo Replace this method with calls to CharmStrategy::deliverToIndices, possibly making it a function that is not part of any class
417
418 */
419 #include "register.h"
420 int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
421                                      envelope *env){
422   int count = 0;
423     //Multicast the messages to all elements in vec
424     int nelements = vec->size();
425     if(nelements == 0) {
426         CmiFree(env);
427         return 0;
428     }
429
430     void *msg = EnvToUsr(env);
431     int ep = env->getsetArrayEp();
432     CkUnpackMessage(&env);
433
434     CkArrayID destination_aid = env->getsetArrayMgr();
435     env->setPacked(0);
436     env->getsetArrayHops()=1;
437     env->setUsed(0);
438
439     CkArrayIndexMax idx;
440
441     //ComlibPrintf("sending to %d elements\n",nelements);
442     for(int i = 0; i < nelements-1; i ++){
443       idx = (*vec)[i];
444         //if(com_debug) idx.print();
445
446         env->getsetArrayIndex() = idx;
447         //ComlibPrintf("sending to: "); idx.print();
448         
449         CkArray *a=(CkArray *)_localBranch(destination_aid);
450         if(_entryTable[ep]->noKeep)
451             count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
452         else {
453             void *newmsg = CkCopyMsg(&msg);
454             count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
455         }
456
457     }
458
459     idx = (*vec)[nelements-1];
460     //if(com_debug) idx.print();
461     env->getsetArrayIndex() = idx;
462     //ComlibPrintf("sending to: "); idx.print();
463     
464     CkArray *a=(CkArray *)_localBranch(destination_aid);
465     if(_entryTable[ep]->noKeep) {
466         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
467         CmiFree(env);
468     }
469     else
470         count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
471
472     return count;
473 }
474
475 /** Delivers a message to an array element, making sure that
476     projections is notified */
477 void ComlibArrayInfo::deliver(envelope *env){
478     ComlibPrintf("In ComlibArrayInfo::deliver()\n");
479                 
480     env->setUsed(0);
481     env->getsetArrayHops()=1;
482     CkUnpackMessage(&env);
483     
484     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
485     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
486 }
487
488
489 /*@}*/