Fixing some bugs in the new node aware multicast strategy.
[charm.git] / src / ck-com / ComlibStrategy.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementations of ComlibStrategy.h
6 */
7
8 #include "ComlibStrategy.h"
9 #include "register.h"
10
11
12 void CharmStrategy::pup(PUP::er &p) {
13   //Strategy::pup(p);
14     p | nginfo;
15     p | ginfo;
16     p | ainfo;
17     //p | forwardOnMigration;
18     p | mflag;
19     p | onFinish;
20 }
21
22
23
24
25
26 /** 
27     Deliver a message to a set of indices using the array manager. Indices can be local or remote. 
28     
29     An optimization for [nokeep] methods is applied: the message is not copied for each invocation.
30    
31     @return the number of destination objects which were not local (information
32     retrieved from the array/location manager)
33 */
34 int CharmStrategy::deliverToIndices(void *msg, int numDestIdxs, const CkArrayIndexMax* indices ){
35   int count = 0;
36   
37   envelope *env = UsrToEnv(msg);
38   int ep = env->getsetArrayEp();
39   CkUnpackMessage(&env);
40
41   CkArrayID destination_aid = env->getsetArrayMgr();
42   CkArray *a=(CkArray *)_localBranch(destination_aid);
43
44   env->setPacked(0);
45   env->getsetArrayHops()=1;
46   env->setUsed(0);
47
48   //  CkPrintf("Delivering to %d objects\n", numDestIdxs);
49
50   if(numDestIdxs > 0){
51         
52     // SEND to all destination objects except the last one
53     for(int i=0; i<numDestIdxs-1;i++){
54       env->getsetArrayIndex() = indices[i];
55       
56       if(_entryTable[ep]->noKeep)
57         // don't make a copy for [nokeep] entry methods
58         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
59       else {
60         void *newmsg = CkCopyMsg(&msg);
61         count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
62       }
63     }
64     
65     // SEND to the final destination object
66     env->getsetArrayIndex() = indices[numDestIdxs-1];
67     
68     if(_entryTable[ep]->noKeep){
69       count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
70       CmiFree(env); // runtime frees the [nokeep] messages
71     }
72     else {
73       count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
74     }
75     
76   }
77
78   return count;
79 }
80
81
82
83
84
85
86
87
88 void CharmMessageHolder::pup(PUP::er &p) {
89
90     //    CkPrintf("In CharmMessageHolder::pup \n"); 
91
92     MessageHolder::pup(p);
93
94     //Sec ID depends on the message
95     //Currently this pup is only being used for remote messages
96     sec_id = NULL;
97 }
98
99 //PUPable_def(CharmStrategy);
100 PUPable_def(CharmMessageHolder);
101
102 ComlibNodeGroupInfo::ComlibNodeGroupInfo() {
103     isNodeGroup = 0;
104     ngid.setZero();
105 };
106
107 void ComlibNodeGroupInfo::pup(PUP::er &p) {
108     p | isNodeGroup;
109     p | ngid;
110 }
111
112 ComlibGroupInfo::ComlibGroupInfo() {
113     
114     isSrcGroup = 0;
115     isDestGroup = 0;
116     nsrcpes = 0;
117     ndestpes = 0;
118     srcpelist = NULL;
119     destpelist = NULL;
120     sgid.setZero();
121     dgid.setZero();
122 };
123
124 ComlibGroupInfo::~ComlibGroupInfo() {
125     if(nsrcpes > 0 && srcpelist != NULL)
126         delete [] srcpelist;
127
128     if(ndestpes > 0 && destpelist != NULL)
129         delete [] destpelist;
130 }
131
132 void ComlibGroupInfo::pup(PUP::er &p){
133
134     p | sgid;
135     p | dgid;
136     p | nsrcpes;
137     p | ndestpes;
138
139     p | isSrcGroup;
140     p | isDestGroup;
141
142     if(p.isUnpacking()) {
143         if(nsrcpes > 0) 
144             srcpelist = new int[nsrcpes];
145
146         if(ndestpes > 0) 
147             destpelist = new int[ndestpes];
148     }
149
150     if(nsrcpes > 0) 
151         p(srcpelist, nsrcpes);
152
153     if(ndestpes > 0) 
154         p(destpelist, ndestpes);
155 }
156
157 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
158                                          int npes) {
159     this->sgid = gid;
160     srcpelist = pelist;
161     nsrcpes = npes;
162     isSrcGroup = 1;
163
164     if(nsrcpes == 0) {
165         nsrcpes = CkNumPes();
166         srcpelist = new int[nsrcpes];
167         for(int count =0; count < nsrcpes; count ++)
168             srcpelist[count] = count;
169     }
170 }
171
172 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
173                                          int &npes){
174     gid = this->sgid;
175     npes = nsrcpes;
176
177     pelist = new int [nsrcpes];
178     memcpy(pelist, srcpelist, npes * sizeof(int));
179 }
180
181 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid){
182     gid = this->sgid;
183 }
184
185 void ComlibGroupInfo::setDestinationGroup(CkGroupID gid, int *pelist, 
186                                          int npes) {
187     this->dgid = gid;
188     destpelist = pelist;
189     ndestpes = npes;
190     isDestGroup = 1;
191
192     if(ndestpes == 0) {
193         ndestpes = CkNumPes();
194         destpelist = new int[ndestpes];
195         for(int count =0; count < ndestpes; count ++)
196             destpelist[count] = count;
197     }
198 }
199
200 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid, int *&pelist, 
201                                          int &npes) {
202     gid = this->dgid;
203     npes = ndestpes;
204
205     pelist = new int [ndestpes];
206     memcpy(pelist, destpelist, npes * sizeof(int));
207 }
208
209 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid) {
210     gid = this->dgid;
211 }
212
213 int *ComlibGroupInfo::getCombinedCountList() {
214   int *result = new int[CkNumPes()];
215   int i;
216   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
217   if (nsrcpes != 0) {
218     for (i=0; i<nsrcpes; ++i) result[srcpelist[i]] |= 1;
219   } else {
220     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
221   }
222   if (ndestpes != 0) {
223     for (i=0; i<ndestpes; ++i) result[destpelist[i]] |= 2;
224   } else {
225     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
226   }
227   return result;
228 }
229
230 /*
231 void ComlibGroupInfo::getCombinedPeList(int *&pelist, int &npes) {
232     int count = 0;        
233     pelist = 0;
234     npes = 0;
235
236     pelist = new int[CkNumPes()];
237     if(nsrcpes == 0 || ndestpes == 0) {
238         npes = CkNumPes();        
239         for(count = 0; count < CkNumPes(); count ++) 
240             pelist[count] = count;                         
241     }
242     else {        
243         npes = ndestpes;
244         memcpy(pelist, destpelist, npes * sizeof(int));
245         
246         //Add source processors to the destination processors
247         //already obtained
248         for(int count = 0; count < nsrcpes; count++) {
249             int p = srcpelist[count];
250
251             for(count = 0; count < npes; count ++)
252                 if(pelist[count] == p)
253                     break;
254
255             if(count == npes)
256                 pelist[npes ++] = p;
257         }                        
258     }
259 }
260 */
261
262 ComlibArrayInfo::ComlibArrayInfo() {
263         
264     src_aid.setZero();
265     //nSrcIndices = -1;
266     //src_elements = NULL;
267     isAllSrc = 0;
268     totalSrc = 0;
269     isSrcArray = 0;
270
271     dest_aid.setZero();
272     //nDestIndices = -1;
273     //dest_elements = NULL;
274     isAllDest = 0;
275     totalDest = 0;
276     isDestArray = 0;
277 };
278
279 /*
280 ComlibArrayInfo::~ComlibArrayInfo() {
281     //CkPrintf("in comlibarrayinfo destructor\n");
282
283     if(nSrcIndices > 0)
284         delete [] src_elements;
285
286     if(nDestIndices > 0)
287         delete [] dest_elements;
288 }
289 */
290
291 /// Set the  source array used for this strategy. 
292 /// The list of array indices should be the whole portion of the array involved in the strategy.
293 /// The non-local array elements will be cleaned up inside purge() at migration of the strategy
294 void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
295                                          CkArrayIndexMax *e, int nind){
296     src_aid = aid;
297     isSrcArray = 1;
298     /*
299     nSrcIndices = nind;
300     if(nind > 0) {
301         src_elements = new CkArrayIndexMax[nind];
302         memcpy(src_elements, e, sizeof(CkArrayIndexMax) * nind);
303     }
304     */
305     src_elements.removeAll();
306     
307     for (int i=0; i<nind; ++i) src_elements.push_back(e[i]);
308     
309     if (src_elements.size() == 0) 
310         isAllSrc = 1;
311     else 
312         isAllSrc = 0;
313     
314     totalSrc = src_elements.size();
315     
316 }
317
318
319 void ComlibArrayInfo::getSourceArray(CkArrayID &aid, 
320                                          CkArrayIndexMax *&e, int &nind){
321     aid = src_aid;
322     nind = src_elements.length();//nSrcIndices;
323     e = src_elements.getVec();//src_elements;
324 }
325
326
327 void ComlibArrayInfo::setDestinationArray(CkArrayID aid, 
328                                           CkArrayIndexMax *e, int nind){
329     dest_aid = aid;
330     isDestArray = 1;
331     /*
332     nDestIndices = nind;
333     if(nind > 0) {
334         dest_elements = new CkArrayIndexMax[nind];
335         memcpy(dest_elements, e, sizeof(CkArrayIndexMax) * nind);
336     }
337     */
338     dest_elements.removeAll();
339     for (int i=0; i<nind; ++i) dest_elements.push_back(e[i]);
340
341     if (dest_elements.size() == 0) 
342         isAllDest = 1;
343     else 
344         isAllDest = 0;
345     
346     totalDest = dest_elements.size();
347     
348 }
349
350
351 void ComlibArrayInfo::getDestinationArray(CkArrayID &aid, 
352                                           CkArrayIndexMax *&e, int &nind){
353     aid = dest_aid;
354     nind = dest_elements.length();
355     e = dest_elements.getVec();
356 }
357
358 /// @TODO fix the pup!
359 //Each strategy must define his own Pup interface.
360 void ComlibArrayInfo::pup(PUP::er &p){ 
361     p | src_aid;
362     //p | nSrcIndices;
363     p | isSrcArray;
364     p | isAllSrc;
365     p | totalSrc;
366     p | src_elements;
367     
368     p | dest_aid;
369     //p | nDestIndices;
370     p | isDestArray;
371     p | isAllDest;
372     p | totalDest;
373     p | dest_elements;
374
375     if (p.isPacking() || p.isUnpacking()) {
376       // calling purge both during packing (at the end) and during unpacking
377       // allows this code to be executed both on processor 0 (where the object
378       // is created) and on every other processor where it arrives through PUP.
379       purge();
380     }
381
382     /*    
383     if(p.isUnpacking() && nSrcIndices > 0) 
384         src_elements = new CkArrayIndexMax[nSrcIndices];
385     
386     if(p.isUnpacking() && nDestIndices > 0) 
387         dest_elements = new CkArrayIndexMax[nDestIndices];        
388     
389     if(nSrcIndices > 0)
390         p((char *)src_elements, nSrcIndices * sizeof(CkArrayIndexMax));    
391     else
392         src_elements = NULL;
393
394     if(nDestIndices > 0)
395         p((char *)dest_elements, nDestIndices * sizeof(CkArrayIndexMax));    
396     else
397         dest_elements = NULL;
398
399     localDestIndexVec.resize(0);
400     */
401     
402 }
403
404 void ComlibArrayInfo::newElement(CkArrayID &id, const CkArrayIndex &idx) {
405   ComlibPrintf("ComlibArrayInfo::newElement\n");
406   if (isAllSrc && id==src_aid) src_elements.push_back(idx);
407   if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
408 }
409
410 void ComlibArrayInfo::purge() {
411         int i;
412         CkArray *a;
413 //      ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
414         if (isSrcArray) {
415                 a = (CkArray *)_localBranch(src_aid);
416                 if (isAllSrc) {
417                         // gather the information of all the elements that are currenly present here
418                         ComlibElementIterator iter(&src_elements);
419                         a->getLocMgr()->iterate(iter);
420
421                         // register to ComlibArrayListener for this array id
422 //                      ComlibManagerRegisterArrayListener(src_aid, this);
423                 } else {
424                         // delete all the elements of which we are not homePe
425                         for (i=src_elements.size()-1; i>=0; --i) {
426                                 if (a->homePe(src_elements[i]) != CkMyPe()) { 
427                                         
428 //                                      ComlibPrintf("[%d] removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
429                                         src_elements.remove(i); 
430                                 }
431                         }
432                 }
433         }
434         if (isDestArray) {
435                 a = (CkArray *)_localBranch(dest_aid);
436                 if (isAllDest) {
437                         // gather the information of all the elements that are currenly present here
438                         ComlibElementIterator iter(&dest_elements);
439                         a->getLocMgr()->iterate(iter);
440
441                         // register to ComlibArrayListener for this array id
442 //                      ComlibManagerRegisterArrayListener(dest_aid, this);
443                 } else {
444                         // delete all the elements of which we are not homePe
445                         for (i=dest_elements.size()-1; i>=0; --i) {
446                                 if (a->homePe(dest_elements[i]) != CkMyPe()) {
447 //                                      ComlibPrintf("[%d] removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
448                                         dest_elements.remove(i); 
449                                 }
450                         }
451                 }
452         }
453 }
454
455 int *ComlibArrayInfo::getCombinedCountList() {
456   int *result = new int[CkNumPes()];
457   int i;
458   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
459   CkArray *a = (CkArray *)_localBranch(src_aid);
460   if (src_elements.size() != 0) {
461     for (i=0; i<src_elements.size(); ++i) result[a->homePe(src_elements[i])] |= 1;
462   } else {
463     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
464   }
465   a = (CkArray *)_localBranch(dest_aid);
466   if (dest_elements.size() != 0) {
467     for (i=0; i<dest_elements.size(); ++i) result[a->homePe(dest_elements[i])] |= 2;
468   } else {
469     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
470   }
471   return result;
472 }
473
474 /*
475 //Get the list of destination processors
476 void ComlibArrayInfo::getDestinationPeList(int *&destpelist, int &ndestpes) {
477     
478     int count = 0, acount =0;
479     
480     //Destination has not been set
481     if(nDestIndices < 0) {
482         destpelist = 0;
483         ndestpes = 0;
484         return;
485     }
486
487     //Create an array of size CkNumPes()
488     //Inefficient in space
489     ndestpes = CkNumPes();
490     destpelist = new int[ndestpes];
491
492     memset(destpelist, 0, ndestpes * sizeof(int));    
493
494     if(nDestIndices == 0){
495         for(count =0; count < CkNumPes(); count ++) 
496             destpelist[count] = count;             
497         return;
498     }
499
500     ndestpes = 0;
501     CkArray *amgr = CkArrayID::CkLocalBranch(dest_aid);
502
503     //Find the last known processors of the array elements
504     for(acount = 0; acount < nDestIndices; acount++) {
505
506       //int p = ComlibGetLastKnown(dest_aid, dest_elements[acount]); 
507         int p = amgr->lastKnown(dest_elements[acount]);
508         
509         for(count = 0; count < ndestpes; count ++)
510             if(destpelist[count] == p)
511                 break;
512         
513         if(count == ndestpes) {
514             destpelist[ndestpes ++] = p; 
515         }       
516     }                            
517 }
518
519 void ComlibArrayInfo::getSourcePeList(int *&srcpelist, int &nsrcpes) {
520     
521     int count = 0, acount =0;
522
523     if(nSrcIndices < 0) {
524         srcpelist = 0;
525         nsrcpes = 0;
526         return;
527     }
528
529     nsrcpes = CkNumPes();
530     srcpelist = new int[nsrcpes];
531
532     memset(srcpelist, 0, nsrcpes * sizeof(int));    
533
534     if(nSrcIndices == 0){
535         for(count =0; count < CkNumPes(); count ++) 
536             srcpelist[count] = count;             
537         return;
538     }
539
540     nsrcpes = 0;
541     CkArray *amgr = CkArrayID::CkLocalBranch(src_aid);
542
543     for(acount = 0; acount < nSrcIndices; acount++) {
544         
545       //int p = ComlibGetLastKnown(src_aid, src_elements[acount]); 
546         int p = amgr->lastKnown(src_elements[acount]);
547         
548         for(count = 0; count < nsrcpes; count ++)
549             if(srcpelist[count] == p)
550                 break;
551         
552         if(count == nsrcpes) {
553             srcpelist[nsrcpes ++] = p; 
554         }       
555     }                            
556 }
557
558 void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
559
560     int count = 0;        
561     pelist = 0;
562     npes = 0;
563     
564     //Both arrays empty;
565     //Sanity check, this should really not happen
566     if(nSrcIndices < 0 && nDestIndices < 0) {
567         CkAbort("Arrays have not been set\n");
568         return;
569     }
570     
571     //One of them is the entire array Hence set the number of
572     //processors to all Currently does not work for the case where
573     //number of array elements less than number of processors
574     //Will fix it later!
575     if(nSrcIndices == 0 || nDestIndices == 0) {
576         npes = CkNumPes();        
577         pelist = new int[npes];
578         for(count = 0; count < CkNumPes(); count ++) 
579             pelist[count] = count;                         
580     }
581     else {
582         getDestinationPeList(pelist, npes);
583         
584         //Destination has not been set
585         //Strategy does not care about destination
586         //Is an error case
587         if(npes == 0)
588             pelist = new int[CkNumPes()];
589         
590         CkArray *amgr = CkArrayID::CkLocalBranch(src_aid);
591
592         //Add source processors to the destination processors
593         //already obtained
594         for(int acount = 0; acount < nSrcIndices; acount++) {
595           //int p = ComlibGetLastKnown(src_aid, src_elements[acount]);
596             int p = amgr->lastKnown(src_elements[acount]);
597
598             for(count = 0; count < npes; count ++)
599                 if(pelist[count] == p)
600                     break;
601             if(count == npes)
602                 pelist[npes ++] = p;
603         }                        
604     }
605 }
606 */
607
608 /// Broadcast the message to all local elements
609 void ComlibArrayInfo::localBroadcast(envelope *env) {
610   int count = localMulticast(&dest_elements, env);
611   ComlibPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
612
613 //  char buf[100000];
614 //  buf[0] = '\0';
615 //  for(int i=0;i<dest_elements.size();i++){
616 //        sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
617 //  }
618 //  ComlibPrintf("dest_elements = %s\n", buf);
619
620 }
621
622 /**
623   This method multicasts the message to all the indices in vec.  It
624   also takes care to check if the entry method is readonly or not. If
625   readonly (nokeep) the message is not copied.
626
627   It also makes sure that the entry methods are logged in projections
628   and that the array manager is notified about array element
629   migrations.  Hence this function should be used extensively in the
630   communication library strategies
631
632   This method is more general than just ComlibArrayInfo dest_aid since it takes
633   the destination array id directly form the message envelope.
634
635   @return the number of destination objects which were not local (information
636   retrieved from the array/location manager)
637
638   @todo Replace this method with calls to CharmStrategy::deliverToIndices, possibly making it a function that is not part of any class
639
640 */
641
642 #include "register.h"
643 int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
644                                      envelope *env){
645   int count = 0;
646     //Multicast the messages to all elements in vec
647     int nelements = vec->size();
648     if(nelements == 0) {
649         CmiFree(env);
650         return 0;
651     }
652
653     void *msg = EnvToUsr(env);
654     int ep = env->getsetArrayEp();
655     CkUnpackMessage(&env);
656
657     CkArrayID destination_aid = env->getsetArrayMgr();
658     env->setPacked(0);
659     env->getsetArrayHops()=1;
660     env->setUsed(0);
661
662     CkArrayIndexMax idx;
663
664     //ComlibPrintf("sending to %d elements\n",nelements);
665     for(int i = 0; i < nelements-1; i ++){
666         idx = (*vec)[i];
667         //if(com_debug) idx.print();
668
669         env->getsetArrayIndex() = idx;
670         //ComlibPrintf("sending to: "); idx.print();
671         
672         CkArray *a=(CkArray *)_localBranch(destination_aid);
673         if(_entryTable[ep]->noKeep)
674             count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
675         else {
676             void *newmsg = CkCopyMsg(&msg);
677             count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
678         }
679
680     }
681
682     idx = (*vec)[nelements-1];
683     //if(com_debug) idx.print();
684     env->getsetArrayIndex() = idx;
685     //ComlibPrintf("sending to: "); idx.print();
686     
687     CkArray *a=(CkArray *)_localBranch(destination_aid);
688     if(_entryTable[ep]->noKeep) {
689         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
690         CmiFree(env);
691     }
692     else
693         count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
694
695     return count;
696 }
697
698 /** Delivers a message to an array element, making sure that
699     projections is notified */
700 void ComlibArrayInfo::deliver(envelope *env){
701     ComlibPrintf("In ComlibArrayInfo::deliver()\n");
702                 
703     env->setUsed(0);
704     env->getsetArrayHops()=1;
705     CkUnpackMessage(&env);
706     
707     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
708     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
709 }
710
711
712 /*@}*/