Removing assert statement that caused strategies only to work with 1d chare arrays.
[charm.git] / src / ck-com / ComlibStrategy.C
1 /**
2    @addtogroup CharmComlib
3    @{
4    @file
5    Implementations of ComlibStrategy.h
6 */
7
8 #include "ComlibStrategy.h"
9 #include "register.h"
10
11
12 void CharmStrategy::pup(PUP::er &p) {
13   //Strategy::pup(p);
14     p | nginfo;
15     p | ginfo;
16     p | ainfo;
17     //p | forwardOnMigration;
18     p | mflag;
19     p | onFinish;
20 }
21
22
23
24
25
26 /** 
27     deliver a message to a set of indices using the array manager. Indices can be local or remote. 
28     
29     An optimization for [nokeep] methods is applied: the message is not copied for each invocation.
30    
31     @return the number of destination objects which were not local (information
32     retrieved from the array/location manager)
33 */
34 int CharmStrategy::deliverToIndices(void *msg, int numDestIdxs, const CkArrayIndexMax* indices ){
35   int count = 0;
36   
37   envelope *env = UsrToEnv(msg);
38   int ep = env->getsetArrayEp();
39   CkUnpackMessage(&env);
40
41   CkArrayID destination_aid = env->getsetArrayMgr();
42   CkArray *a=(CkArray *)_localBranch(destination_aid);
43
44   env->setPacked(0);
45   env->getsetArrayHops()=1;
46   env->setUsed(0);
47
48   //  CkPrintf("Delivering to %d objects\n", numDestIdxs);
49
50   if(numDestIdxs > 0){
51         
52     // SEND to all destination objects except the last one
53     for(int i=0; i<numDestIdxs-1;i++){
54       env->getsetArrayIndex() = indices[i];
55       
56       //      CkPrintf("[%d] in deliverToIndices env->event=%d pe=%d\n", CkMyPe(), (int)env->getEvent(), (int)env->getSrcPe());
57
58       if(_entryTable[ep]->noKeep)
59         // don't make a copy for [nokeep] entry methods
60         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
61       else {
62         void *newmsg = CkCopyMsg(&msg);
63         count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
64       }
65     }
66     
67     // SEND to the final destination object
68     env->getsetArrayIndex() = indices[numDestIdxs-1];
69     
70     if(_entryTable[ep]->noKeep){
71       count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
72       CmiFree(env); // runtime frees the [nokeep] messages
73     }
74     else {
75       count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
76     }
77     
78   }
79
80   return count;
81 }
82
83
84
85
86
87
88
89
90 void CharmMessageHolder::pup(PUP::er &p) {
91
92     //    CkPrintf("In CharmMessageHolder::pup \n"); 
93
94     MessageHolder::pup(p);
95
96     //Sec ID depends on the message
97     //Currently this pup is only being used for remote messages
98     sec_id = NULL;
99 }
100
101 //PUPable_def(CharmStrategy);
102 PUPable_def(CharmMessageHolder);
103
104 ComlibNodeGroupInfo::ComlibNodeGroupInfo() {
105     isNodeGroup = 0;
106     ngid.setZero();
107 };
108
109 void ComlibNodeGroupInfo::pup(PUP::er &p) {
110     p | isNodeGroup;
111     p | ngid;
112 }
113
114 ComlibGroupInfo::ComlibGroupInfo() {
115     
116     isSrcGroup = 0;
117     isDestGroup = 0;
118     nsrcpes = 0;
119     ndestpes = 0;
120     srcpelist = NULL;
121     destpelist = NULL;
122     sgid.setZero();
123     dgid.setZero();
124 };
125
126 ComlibGroupInfo::~ComlibGroupInfo() {
127     if(nsrcpes > 0 && srcpelist != NULL)
128         delete [] srcpelist;
129
130     if(ndestpes > 0 && destpelist != NULL)
131         delete [] destpelist;
132 }
133
134 void ComlibGroupInfo::pup(PUP::er &p){
135
136     p | sgid;
137     p | dgid;
138     p | nsrcpes;
139     p | ndestpes;
140
141     p | isSrcGroup;
142     p | isDestGroup;
143
144     if(p.isUnpacking()) {
145         if(nsrcpes > 0) 
146             srcpelist = new int[nsrcpes];
147
148         if(ndestpes > 0) 
149             destpelist = new int[ndestpes];
150     }
151
152     if(nsrcpes > 0) 
153         p(srcpelist, nsrcpes);
154
155     if(ndestpes > 0) 
156         p(destpelist, ndestpes);
157 }
158
159 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
160                                          int npes) {
161     this->sgid = gid;
162     srcpelist = pelist;
163     nsrcpes = npes;
164     isSrcGroup = 1;
165
166     if(nsrcpes == 0) {
167         nsrcpes = CkNumPes();
168         srcpelist = new int[nsrcpes];
169         for(int count =0; count < nsrcpes; count ++)
170             srcpelist[count] = count;
171     }
172 }
173
174 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
175                                          int &npes){
176     gid = this->sgid;
177     npes = nsrcpes;
178
179     pelist = new int [nsrcpes];
180     memcpy(pelist, srcpelist, npes * sizeof(int));
181 }
182
183 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid){
184     gid = this->sgid;
185 }
186
187 void ComlibGroupInfo::setDestinationGroup(CkGroupID gid, int *pelist, 
188                                          int npes) {
189     this->dgid = gid;
190     destpelist = pelist;
191     ndestpes = npes;
192     isDestGroup = 1;
193
194     if(ndestpes == 0) {
195         ndestpes = CkNumPes();
196         destpelist = new int[ndestpes];
197         for(int count =0; count < ndestpes; count ++)
198             destpelist[count] = count;
199     }
200 }
201
202 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid, int *&pelist, 
203                                          int &npes) {
204     gid = this->dgid;
205     npes = ndestpes;
206
207     pelist = new int [ndestpes];
208     memcpy(pelist, destpelist, npes * sizeof(int));
209 }
210
211 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid) {
212     gid = this->dgid;
213 }
214
215 int *ComlibGroupInfo::getCombinedCountList() {
216   int *result = new int[CkNumPes()];
217   int i;
218   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
219   if (nsrcpes != 0) {
220     for (i=0; i<nsrcpes; ++i) result[srcpelist[i]] |= 1;
221   } else {
222     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
223   }
224   if (ndestpes != 0) {
225     for (i=0; i<ndestpes; ++i) result[destpelist[i]] |= 2;
226   } else {
227     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
228   }
229   return result;
230 }
231
232
233 ComlibArrayInfo::ComlibArrayInfo() {
234         
235     src_aid.setZero();
236     isAllSrc = 0;
237     totalSrc = 0;
238     isSrcArray = 0;
239
240     dest_aid.setZero();
241     isAllDest = 0;
242     totalDest = 0;
243     isDestArray = 0;
244 };
245
246
247 void ComlibArrayInfo::setSourceArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
248     src_aid = aid;
249     isSrcArray = 1;
250
251     src_elements.removeAll();
252     for (int i=0; i<nind; ++i){
253       CkAssert(e[i].nInts == 1);
254       src_elements.push_back(e[i]);
255     }
256     
257     if (nind == 0) 
258         isAllSrc = 1;
259     else 
260         isAllSrc = 0;
261
262     totalSrc = nind;
263
264     CkAssert(src_elements.size() == totalSrc);    
265
266 }
267
268
269 void ComlibArrayInfo::setDestinationArray(CkArrayID aid, CkArrayIndexMax *e, int nind){
270   ComlibPrintf("[%d] ComlibArrayInfo::setDestinationArray  dest_elements\n", CkMyPe());
271     dest_aid = aid;
272     isDestArray = 1;
273    
274     dest_elements.removeAll();
275     for (int i=0; i<nind; ++i){
276       CkAssert(e[i].nInts > 0);
277       dest_elements.push_back(e[i]);
278     }
279
280     if (nind == 0) 
281         isAllDest = 1;
282     else 
283         isAllDest = 0;
284     
285     totalDest = nind;
286     CkAssert(dest_elements.size() == totalDest);    
287
288 }
289
290
291 /// @TODO fix the pup!
292 //Each strategy must define his own Pup interface.
293 void ComlibArrayInfo::pup(PUP::er &p){ 
294     p | src_aid;
295     p | isSrcArray;
296     p | isAllSrc;
297     p | totalSrc;
298     p | src_elements; 
299     p | new_src_elements;
300
301     p | dest_aid;
302     p | isDestArray;
303     p | isAllDest;
304     p | totalDest;
305     p | dest_elements; 
306     p | new_dest_elements;
307
308     if (p.isPacking() || p.isUnpacking()) {
309       // calling purge both during packing (at the end) and during unpacking
310       // allows this code to be executed both on processor 0 (where the object
311       // is created) and on every other processor where it arrives through PUP.
312       purge();
313     }
314
315     
316 }
317
318
319 void ComlibArrayInfo::printDestElementList() {
320   char buf[100000];
321   buf[0] = '\0';
322   for(int i=0;i<dest_elements.size();i++){
323     sprintf(buf+strlen(buf), " %d", dest_elements[i].data()[0]);
324   }
325   CkPrintf("[%d] dest_elements = %s\n", CkMyPe(), buf);
326 }
327
328
329 void ComlibArrayInfo::newElement(CkArrayID &id, const CkArrayIndex &idx) {
330   CkAbort("New Comlib implementation does not allow dynamic element insertion yet\n");
331   //  CkPrintf("ComlibArrayInfo::newElement dest_elements\n");
332   //  if (isAllSrc && id==src_aid) src_elements.push_back(idx);
333   //  if (isAllDest && id==dest_aid) dest_elements.push_back(idx);
334 }
335
336 void ComlibArrayInfo::purge() {
337   //    ComlibPrintf("[%d] ComlibArrayInfo::purge srcArray=%d (%d), destArray=%d (%d)\n",CkMyPe(),isSrcArray,isAllSrc,isDestArray,isAllDest);
338
339   if (isSrcArray) {
340     CkArray *a = (CkArray *)_localBranch(src_aid);
341
342     // delete all the source elements for which we are not homePe
343     for (int i=src_elements.size()-1; i>=0; --i) {
344       if (a->homePe(src_elements[i]) != CkMyPe()) {                     
345         ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d src element %d  i=%d\n", CkMyPe(),a->homePe(src_elements[i]), src_elements[i].data()[0], i);
346         src_elements.remove(i); 
347       }
348     }
349   }
350
351   if (isDestArray) {
352     CkArray *a = (CkArray *)_localBranch(dest_aid);
353         
354     // delete all the destination elements for which we are not homePe
355     for (int i=dest_elements.size()-1; i>=0; --i) {
356       if (a->homePe(dest_elements[i]) != CkMyPe()) {
357         ComlibPrintf("[%d] ComlibArrayInfo::purge removing home=%d dest element %d  i=%d\n", CkMyPe(), a->homePe(dest_elements[i]), dest_elements[i].data()[0], i);
358         dest_elements.remove(i); 
359       }
360     }           
361   }
362
363 }
364
365 int *ComlibArrayInfo::getCombinedCountList() {
366   int *result = new int[CkNumPes()];
367   int i;
368   for (i=0; i<CkNumPes(); ++i) result[i] = 0;
369   CkArray *a = (CkArray *)_localBranch(src_aid);
370   if (src_elements.size() != 0) {
371     for (i=0; i<src_elements.size(); ++i) result[a->homePe(src_elements[i])] |= 1;
372   } else {
373     for (i=0; i<CkNumPes(); ++i) result[i] |= 1;
374   }
375   a = (CkArray *)_localBranch(dest_aid);
376   if (dest_elements.size() != 0) {
377     for (i=0; i<dest_elements.size(); ++i) result[a->homePe(dest_elements[i])] |= 2;
378   } else {
379     for (i=0; i<CkNumPes(); ++i) result[i] |= 2;
380   }
381   return result;
382 }
383
384
385
386 /**  Broadcast the message to all local elements (as listed in dest_elements) */
387 void ComlibArrayInfo::localBroadcast(envelope *env) {
388   int count = localMulticast(&dest_elements, env);
389   if(com_debug){
390     CkPrintf("[%d] ComlibArrayInfo::localBroadcast to %d elements (%d non local)\n",CmiMyPe(),dest_elements.size(),count);
391     printDestElementList();
392   }
393
394 }
395
396
397
398 /**
399   This method multicasts the message to all the indices in vec.  It
400   also takes care to check if the entry method is readonly or not. If
401   readonly (nokeep) the message is not copied.
402
403   It also makes sure that the entry methods are logged in projections
404   and that the array manager is notified about array element
405   migrations.  Hence this function should be used extensively in the
406   communication library strategies
407
408   This method is more general than just ComlibArrayInfo dest_aid since it takes
409   the destination array id directly form the message envelope.
410
411   @return the number of destination objects which were not local (information
412   retrieved from the array/location manager)
413
414   @todo Replace this method with calls to CharmStrategy::deliverToIndices, possibly making it a function that is not part of any class
415
416 */
417 #include "register.h"
418 int ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
419                                      envelope *env){
420   int count = 0;
421     //Multicast the messages to all elements in vec
422     int nelements = vec->size();
423     if(nelements == 0) {
424         CmiFree(env);
425         return 0;
426     }
427
428     void *msg = EnvToUsr(env);
429     int ep = env->getsetArrayEp();
430     CkUnpackMessage(&env);
431
432     CkArrayID destination_aid = env->getsetArrayMgr();
433     env->setPacked(0);
434     env->getsetArrayHops()=1;
435     env->setUsed(0);
436
437     CkArrayIndexMax idx;
438
439     //ComlibPrintf("sending to %d elements\n",nelements);
440     for(int i = 0; i < nelements-1; i ++){
441       idx = (*vec)[i];
442         //if(com_debug) idx.print();
443
444         env->getsetArrayIndex() = idx;
445         //ComlibPrintf("sending to: "); idx.print();
446         
447         CkArray *a=(CkArray *)_localBranch(destination_aid);
448         if(_entryTable[ep]->noKeep)
449             count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
450         else {
451             void *newmsg = CkCopyMsg(&msg);
452             count += a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
453         }
454
455     }
456
457     idx = (*vec)[nelements-1];
458     //if(com_debug) idx.print();
459     env->getsetArrayIndex() = idx;
460     //ComlibPrintf("sending to: "); idx.print();
461     
462     CkArray *a=(CkArray *)_localBranch(destination_aid);
463     if(_entryTable[ep]->noKeep) {
464         count += a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
465         CmiFree(env);
466     }
467     else
468         count += a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
469
470     return count;
471 }
472
473 /** Delivers a message to an array element, making sure that
474     projections is notified */
475 void ComlibArrayInfo::deliver(envelope *env){
476     ComlibPrintf("In ComlibArrayInfo::deliver()\n");
477                 
478     env->setUsed(0);
479     env->getsetArrayHops()=1;
480     CkUnpackMessage(&env);
481     
482     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
483     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
484 }
485
486
487 /*@}*/