added a callback functionality at the end of an iteration
[charm.git] / src / ck-com / ComlibStrategy.C
1
2 #include "charm++.h"
3 #include "envelope.h"
4
5 //calls ComlibNotifyMigrationDone(). Even compiles when -module comlib
6 //is not included. Hack to make loadbalancer work without comlib
7 //currently.
8 CkpvDeclare(int, migrationDoneHandlerID);
9
10 //Class that defines the entry methods that a Charm level strategy
11 //must define.  To write a new strategy inherit from this class and
12 //define the virtual methods.  Every strategy can also define its own
13 //constructor and have any number of arguments. Also call the parent
14 //class methods in those methods.
15
16 void CharmStrategy::insertMessage(MessageHolder *mh){
17     insertMessage((CharmMessageHolder *)mh);
18 }
19
20 void CharmStrategy::pup(PUP::er &p) {
21     Strategy::pup(p);
22     p | nginfo;
23     p | ginfo;
24     p | ainfo;
25     p | forwardOnMigration;
26     p | mflag;
27     p | onFinish;
28 }
29
30 CharmMessageHolder::CharmMessageHolder(char * msg, int proc) 
31     : MessageHolder((char *)UsrToEnv(msg), proc, 
32                     UsrToEnv(msg)->getTotalsize()){
33     
34     sec_id = NULL;    
35 }
36
37 CharmMessageHolder::~CharmMessageHolder() { 
38 }
39
40 void CharmMessageHolder::pup(PUP::er &p) {
41
42     //    CkPrintf("In CharmMessageHolder::pup \n"); 
43
44     MessageHolder::pup(p);
45
46     //Sec ID depends on the message
47     //Currently this pup is only being used for remote messages
48     sec_id = NULL;
49 }
50
51 PUPable_def(CharmStrategy);
52 PUPable_def(CharmMessageHolder);
53
54 ComlibNodeGroupInfo::ComlibNodeGroupInfo() {
55     isNodeGroup = 0;
56     ngid.setZero();
57 };
58
59 void ComlibNodeGroupInfo::pup(PUP::er &p) {
60     p | isNodeGroup;
61     p | ngid;
62 }
63
64 ComlibGroupInfo::ComlibGroupInfo() {
65     
66     isSrcGroup = 0;
67     isDestGroup = 0;
68     nsrcpes = 0;
69     ndestpes = 0;
70     srcpelist = NULL;
71     destpelist = NULL;
72     sgid.setZero();
73     dgid.setZero();
74 };
75
76 ComlibGroupInfo::~ComlibGroupInfo() {
77     if(nsrcpes > 0 && srcpelist != NULL)
78         delete [] srcpelist;
79
80     if(ndestpes > 0 && destpelist != NULL)
81         delete [] destpelist;
82 }
83
84 void ComlibGroupInfo::pup(PUP::er &p){
85
86     p | sgid;
87     p | dgid;
88     p | nsrcpes;
89     p | ndestpes;
90
91     p | isSrcGroup;
92     p | isDestGroup;
93
94     if(p.isUnpacking()) {
95         if(nsrcpes > 0) 
96             srcpelist = new int[nsrcpes];
97
98         if(ndestpes > 0) 
99             destpelist = new int[ndestpes];
100     }
101
102     if(nsrcpes > 0) 
103         p(srcpelist, nsrcpes);
104
105     if(ndestpes > 0) 
106         p(destpelist, ndestpes);
107 }
108
109 void ComlibGroupInfo::setSourceGroup(CkGroupID gid, int *pelist, 
110                                          int npes) {
111     this->sgid = gid;
112     srcpelist = pelist;
113     nsrcpes = npes;
114     isSrcGroup = 1;
115
116     if(nsrcpes == 0) {
117         nsrcpes = CkNumPes();
118         srcpelist = new int[nsrcpes];
119         for(int count =0; count < nsrcpes; count ++)
120             srcpelist[count] = count;
121     }
122 }
123
124 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid, int *&pelist, 
125                                          int &npes){
126     gid = this->sgid;
127     npes = nsrcpes;
128
129     pelist = new int [nsrcpes];
130     memcpy(pelist, srcpelist, npes * sizeof(int));
131 }
132
133 void ComlibGroupInfo::getSourceGroup(CkGroupID &gid){
134     gid = this->sgid;
135 }
136
137 void ComlibGroupInfo::setDestinationGroup(CkGroupID gid, int *pelist, 
138                                          int npes) {
139     this->dgid = gid;
140     destpelist = pelist;
141     ndestpes = npes;
142     isDestGroup = 1;
143
144     if(ndestpes == 0) {
145         ndestpes = CkNumPes();
146         destpelist = new int[ndestpes];
147         for(int count =0; count < ndestpes; count ++)
148             destpelist[count] = count;
149     }
150 }
151
152 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid, int *&pelist, 
153                                          int &npes){
154     gid = this->dgid;
155     npes = ndestpes;
156
157     pelist = new int [ndestpes];
158     memcpy(pelist, destpelist, npes * sizeof(int));
159 }
160
161 void ComlibGroupInfo::getDestinationGroup(CkGroupID &gid){
162     gid = this->dgid;
163 }
164
165 void ComlibGroupInfo::getCombinedPeList(int *&pelist, int &npes) {
166     int count = 0;        
167     pelist = 0;
168     npes = 0;
169
170     pelist = new int[CkNumPes()];
171     if(nsrcpes == 0 || ndestpes == 0) {
172         npes = CkNumPes();        
173         for(count = 0; count < CkNumPes(); count ++) 
174             pelist[count] = count;                         
175     }
176     else {        
177         npes = ndestpes;
178         memcpy(pelist, destpelist, npes * sizeof(int));
179         
180         //Add source processors to the destination processors
181         //already obtained
182         for(int count = 0; count < nsrcpes; count++) {
183             int p = srcpelist[count];
184
185             for(count = 0; count < npes; count ++)
186                 if(pelist[count] == p)
187                     break;
188
189             if(count == npes)
190                 pelist[npes ++] = p;
191         }                        
192     }
193 }
194
195 ComlibArrayInfo::ComlibArrayInfo() {
196
197     src_aid.setZero();
198     nSrcIndices = -1;
199     src_elements = NULL;
200     isSrcArray = 0;
201
202     dest_aid.setZero();
203     nDestIndices = -1;
204     dest_elements = NULL;
205     isDestArray = 0;
206 };
207
208 ComlibArrayInfo::~ComlibArrayInfo() {
209     //CkPrintf("in comlibarrayinfo destructor\n");
210
211     if(nSrcIndices > 0)
212         delete [] src_elements;
213
214     if(nDestIndices > 0)
215         delete [] dest_elements;
216 }
217
218 void ComlibArrayInfo::setSourceArray(CkArrayID aid, 
219                                          CkArrayIndexMax *e, int nind){
220     src_aid = aid;
221     isSrcArray = 1;    
222     nSrcIndices = nind;
223     if(nind > 0) {
224         src_elements = new CkArrayIndexMax[nind];
225         memcpy(src_elements, e, sizeof(CkArrayIndexMax) * nind);
226     }
227 }
228
229
230 void ComlibArrayInfo::getSourceArray(CkArrayID &aid, 
231                                          CkArrayIndexMax *&e, int &nind){
232     aid = src_aid;
233     nind = nSrcIndices;
234     e = src_elements;
235 }
236
237
238 void ComlibArrayInfo::setDestinationArray(CkArrayID aid, 
239                                           CkArrayIndexMax *e, int nind){
240     dest_aid = aid;
241     isDestArray = 1;    
242     nDestIndices = nind;
243     if(nind > 0) {
244         dest_elements = new CkArrayIndexMax[nind];
245         memcpy(dest_elements, e, sizeof(CkArrayIndexMax) * nind);
246     }
247 }
248
249
250 void ComlibArrayInfo::getDestinationArray(CkArrayID &aid, 
251                                           CkArrayIndexMax *&e, int &nind){
252     aid = dest_aid;
253     nind = nDestIndices;
254     e = dest_elements;
255 }
256
257
258 //Each strategy must define his own Pup interface.
259 void ComlibArrayInfo::pup(PUP::er &p){ 
260     p | src_aid;
261     p | nSrcIndices;
262     p | isSrcArray;
263     
264     p | dest_aid;
265     p | nDestIndices;
266     p | isDestArray;
267     
268     if(p.isUnpacking() && nSrcIndices > 0) 
269         src_elements = new CkArrayIndexMax[nSrcIndices];
270     
271     if(p.isUnpacking() && nDestIndices > 0) 
272         dest_elements = new CkArrayIndexMax[nDestIndices];        
273     
274     if(nSrcIndices > 0)
275         p((char *)src_elements, nSrcIndices * sizeof(CkArrayIndexMax));    
276     else
277         src_elements = NULL;
278
279     if(nDestIndices > 0)
280         p((char *)dest_elements, nDestIndices * sizeof(CkArrayIndexMax));    
281     else
282         dest_elements = NULL;
283
284     localDestIndexVec.resize(0);
285 }
286
287 //Get the list of destination processors
288 void ComlibArrayInfo::getDestinationPeList(int *&destpelist, int &ndestpes) {
289     
290     int count = 0, acount =0;
291     
292     //Destination has not been set
293     if(nDestIndices < 0) {
294         destpelist = 0;
295         ndestpes = 0;
296         return;
297     }
298
299     //Create an array of size CkNumPes()
300     //Inefficient in space
301     ndestpes = CkNumPes();
302     destpelist = new int[ndestpes];
303
304     memset(destpelist, 0, ndestpes * sizeof(int));    
305
306     if(nDestIndices == 0){
307         for(count =0; count < CkNumPes(); count ++) 
308             destpelist[count] = count;             
309         return;
310     }
311
312     ndestpes = 0;
313
314     //Find the last known processors of the array elements
315     for(acount = 0; acount < nDestIndices; acount++) {
316
317         int p = ComlibGetLastKnown(dest_aid, dest_elements[acount]); 
318         
319         for(count = 0; count < ndestpes; count ++)
320             if(destpelist[count] == p)
321                 break;
322         
323         if(count == ndestpes) {
324             destpelist[ndestpes ++] = p; 
325         }       
326     }                            
327 }
328
329 void ComlibArrayInfo::getSourcePeList(int *&srcpelist, int &nsrcpes) {
330     
331     int count = 0, acount =0;
332
333     if(nSrcIndices < 0) {
334         srcpelist = 0;
335         nsrcpes = 0;
336         return;
337     }
338
339     nsrcpes = CkNumPes();
340     srcpelist = new int[nsrcpes];
341
342     memset(srcpelist, 0, nsrcpes * sizeof(int));    
343
344     if(nSrcIndices == 0){
345         for(count =0; count < CkNumPes(); count ++) 
346             srcpelist[count] = count;             
347         return;
348     }
349
350     nsrcpes = 0;
351     for(acount = 0; acount < nSrcIndices; acount++) {
352         
353         int p = ComlibGetLastKnown(src_aid, src_elements[acount]); 
354         
355         for(count = 0; count < nsrcpes; count ++)
356             if(srcpelist[count] == p)
357                 break;
358         
359         if(count == nsrcpes) {
360             srcpelist[nsrcpes ++] = p; 
361         }       
362     }                            
363 }
364
365 void ComlibArrayInfo::getCombinedPeList(int *&pelist, int &npes) {
366
367     int count = 0;        
368     pelist = 0;
369     npes = 0;
370     
371     //Both arrays empty;
372     //Sanity check, this should really not happen
373     if(nSrcIndices < 0 && nDestIndices < 0) {
374         CkAbort("Arrays have not been set\n");
375         return;
376     }
377     
378     //One of them is the entire array Hence set the number of
379     //processors to all Currently does not work for the case where
380     //number of array elements less than number of processors
381     //Will fix it later!
382     if(nSrcIndices == 0 || nDestIndices == 0) {
383         npes = CkNumPes();        
384         pelist = new int[npes];
385         for(count = 0; count < CkNumPes(); count ++) 
386             pelist[count] = count;                         
387     }
388     else {
389         getDestinationPeList(pelist, npes);
390         
391         //Destination has not been set
392         //Strategy does not care about destination
393         //Is an error case
394         if(npes == 0)
395             pelist = new int[CkNumPes()];
396         
397         //Add source processors to the destination processors
398         //already obtained
399         for(int acount = 0; acount < nSrcIndices; acount++) {
400             int p = ComlibGetLastKnown(src_aid, src_elements[acount]);
401
402             for(count = 0; count < npes; count ++)
403                 if(pelist[count] == p)
404                     break;
405             if(count == npes)
406                 pelist[npes ++] = p;
407         }                        
408     }
409 }
410
411 void ComlibArrayInfo::localBroadcast(envelope *env) {
412     //Insert all local elements into a vector
413     if(localDestIndexVec.size()==0 && !dest_aid.isZero()) {
414         CkArray *dest_array = CkArrayID::CkLocalBranch(dest_aid);
415         
416         if(nDestIndices == 0){            
417             dest_array->getComlibArrayListener()->getLocalIndices
418                 (localDestIndexVec);
419         }
420         else {
421             for(int count = 0; count < nDestIndices; count++) {
422                 if(ComlibGetLastKnown(dest_aid, dest_elements[count])
423                    == CkMyPe())
424                     localDestIndexVec.insertAtEnd(dest_elements[count]);
425             }
426         }
427     }
428
429     ComlibArrayInfo::localMulticast(&localDestIndexVec, env);
430 }
431
432 /*
433   This method multicasts the message to all the indices in vec.  It
434   also takes care to check if the entry method is readonly or not? If
435   readonly (nokeep) the message is not copied.
436
437   It also makes sure that the entry methods are logged in projections
438   and that the array manager is notified about array element
439   migrations.  Hence this function should be used extensively in the
440   communication library strategies */
441
442 #include "register.h"
443 void ComlibArrayInfo::localMulticast(CkVec<CkArrayIndexMax>*vec,
444                                      envelope *env){
445
446     //Multicast the messages to all elements in vec
447     int nelements = vec->size();
448     if(nelements == 0) {
449         CmiFree(env);
450         return;
451     }
452
453     void *msg = EnvToUsr(env);
454     int ep = env->getsetArrayEp();
455     CkUnpackMessage(&env);
456
457     CkArrayID dest_aid = env->getsetArrayMgr();
458     env->setPacked(0);
459     env->getsetArrayHops()=1;
460     env->setUsed(0);
461
462     CkArrayIndexMax idx;
463     
464     for(int count = 0; count < nelements-1; count ++){
465         idx = (*vec)[count];
466         //if(comm_debug) idx.print();
467
468         env->getsetArrayIndex() = idx;
469         
470         CkArray *a=(CkArray *)_localBranch(dest_aid);
471         if(_entryTable[ep]->noKeep)
472             a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
473         else {
474             void *newmsg = CkCopyMsg(&msg);
475             a->deliver((CkArrayMessage *)newmsg, CkDeliver_queue);
476         }
477
478     }
479
480     idx = (*vec)[nelements-1];
481     //if(comm_debug) idx.print();
482     env->getsetArrayIndex() = idx;
483     
484     CkArray *a=(CkArray *)_localBranch(dest_aid);
485     if(_entryTable[ep]->noKeep) {
486         a->deliver((CkArrayMessage *)msg, CkDeliver_inline, CK_MSG_KEEP);
487         CmiFree(env);
488     }
489     else
490         a->deliver((CkArrayMessage *)msg, CkDeliver_queue);
491 }
492
493 /* Delivers a message to an array element, making sure that
494    projections is notified */
495 void ComlibArrayInfo::deliver(envelope *env){
496     
497     env->setUsed(0);
498     env->getsetArrayHops()=1;
499     CkUnpackMessage(&env);
500     
501     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
502     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);    
503 }
504
505 void ComlibNotifyMigrationDone() {
506     if(CkpvInitialized(migrationDoneHandlerID)) 
507         if(CkpvAccess(migrationDoneHandlerID) > 0) {
508             char *msg = (char *)CmiAlloc(CmiReservedHeaderSize);
509             CmiSetHandler(msg, CkpvAccess(migrationDoneHandlerID));
510 #if CMK_BLUEGENE_CHARM
511             // bluegene charm should avoid directly calling converse
512             CmiSyncSendAndFree(CkMyPe(), CmiReservedHeaderSize, msg);
513 #else
514             CmiHandleMessage(msg);
515 #endif
516         }
517 }
518
519
520 //Stores the location of many array elements used by the
521 //strategies.  Since hash table returns a reference to the object
522 //and for an int that will be 0, the actual value stored is pe +
523 //CkNumPes so 0 would mean processor -CkNumPes which is invalid.
524 CkpvDeclare(ClibLocationTableType *, locationTable);
525
526 CkpvDeclare(CkArrayIndexMax, cache_index);
527 CkpvDeclare(int, cache_pe);
528 CkpvDeclare(CkArrayID, cache_aid);
529
530 int ComlibGetLastKnown(CkArrayID aid, CkArrayIndexMax idx) {
531     //CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
532     //return (cgproxy.ckLocalBranch())->getLastKnown(aid, idx);
533
534     if(!CpvInitialized(locationTable)) {
535         CkAbort("Uninitialized table\n");
536     }
537     CkAssert(CkpvAccess(locationTable) != NULL);
538
539     
540     if(CkpvAccess(cache_index) == idx && CkpvAccess(cache_aid) == aid)
541         return CkpvAccess(cache_pe);
542     
543     ClibGlobalArrayIndex cidx;
544     cidx.aid = aid;
545     cidx.idx = idx;
546     int pe = CkpvAccess(locationTable)->get(cidx);
547     
548     if(pe == 0) {
549         //Array element does not exist in the table
550         
551         CkArray *array = CkArrayID::CkLocalBranch(aid);
552         pe = array->lastKnown(idx) + CkNumPes();
553         CkpvAccess(locationTable)->put(cidx) = pe;
554     }
555     //CkPrintf("last pe = %d \n", pe - CkNumPes());
556     
557     CkpvAccess(cache_index) = idx;
558     CkpvAccess(cache_aid) = aid;
559     CkpvAccess(cache_pe) = pe - CkNumPes();
560
561     return pe - CkNumPes();
562 }