a few doxygen documentation
[charm.git] / src / ck-com / ComlibManager.C
1 /**
2    @addtogroup Comlib
3 */
4 /*@{*/
5 #include "ComlibManager.h"
6 #include "EachToManyMulticastStrategy.h"
7 #include "DirectMulticastStrategy.h"
8 #include "StreamingStrategy.h"
9 #include "DummyStrategy.h"
10 #include "MPIStrategy.h"
11 #include "NodeMulticast.h"
12 #include "MsgPacker.h"
13 #include "RingMulticastStrategy.h"
14 #include "MultiRingMulticast.h"
15 #include "PipeBroadcastStrategy.h"
16 #include "BroadcastStrategy.h"
17 #include "MeshStreamingStrategy.h"
18 #include "PrioStreaming.h"
19
20 CkpvExtern(int, RecvdummyHandle);
21
22 CkpvDeclare(int, RecvmsgHandle);
23 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
24 CkpvDeclare(CkGroupID, cmgrID);
25 CkpvExtern(ConvComlibManager *, conv_com_ptr);
26
27 //Handler to receive array messages
28 void recv_array_msg(void *msg){
29
30     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
31
32     if(msg == NULL)
33         return;
34     
35     register envelope* env = (envelope *)msg;
36     env->setUsed(0);
37     env->getsetArrayHops()=1;
38     CkUnpackMessage(&env);
39
40     int srcPe = env->getSrcPe();
41     int sid = ((CmiMsgHeaderExt *) env)->stratid;
42
43     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
44              sid, env->getTotalsize(), srcPe);
45
46     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
47     
48     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
49     //if(!comm_debug)
50     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
51
52     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
53     return;
54 }
55
56 void recv_combined_array_msg(void *msg){
57     if(msg == NULL)
58         return;
59     
60     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
61
62     MsgPacker::deliver((CombinedMessage *)msg);
63 }
64
65 ComlibManager::ComlibManager(){
66     init();
67     ComlibPrintf("In comlibmanager constructor\n");
68 }
69
70 void ComlibManager::init(){
71     
72     initComlibManager();
73
74     PUPable_reg(CharmStrategy);
75     PUPable_reg(CharmMessageHolder);
76     
77     //comm_debug = 1;
78     
79     numStatsReceived = 0;
80     curComlibController = 0;
81     clibIteration = 0;
82     
83     strategyCreated = CmiFalse;
84
85     CkpvInitialize(ClibLocationTableType*, locationTable);
86     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
87
88     CkpvInitialize(CkArrayIndexMax, cache_index);
89     CkpvInitialize(int, cache_pe);
90     CkpvInitialize(CkArrayID, cache_aid);
91
92     CkpvAccess(cache_index).nInts = -1;
93     CkpvAccess(cache_aid).setZero();
94
95     CkpvInitialize(int, RecvmsgHandle);
96     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
97
98     bcast_pelist = new int [CkNumPes()];
99     for(int brcount = 0; brcount < CkNumPes(); brcount++)
100         bcast_pelist[brcount] = brcount;
101
102     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
103     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
104         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
105     
106     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
107     
108     npes = CkNumPes();
109     pelist = NULL;
110
111     CkpvInitialize(CkGroupID, cmgrID);
112     CkpvAccess(cmgrID) = thisgroup;
113
114     dummyArrayIndex.nInts = 0;
115
116     curStratID = 0;
117     prevStratID = -1;
118     //prioEndIterationFlag = 1;
119
120     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
121     
122     receivedTable = 0;
123     setupComplete = 0;
124
125     barrierReached = 0;
126     barrier2Reached = 0;
127
128     bcount = b2count = 0;
129     lbUpdateReceived = CmiFalse;
130
131     isRemote = 0;
132     remotePe = -1;
133
134     CkpvInitialize(int, migrationDoneHandlerID);
135     CkpvAccess(migrationDoneHandlerID) = 
136         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
137     
138     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
139     cgproxy[curComlibController].barrier();
140 }
141
142 //First barrier makes sure that the communication library group 
143 //has been created on all processors
144 void ComlibManager::barrier(){
145     ComlibPrintf("In barrier %d\n", bcount);
146     if(CkMyPe() == 0) {
147         bcount ++;
148         if(bcount == CkNumPes()){
149             bcount = 0;
150             barrierReached = 1;
151             barrier2Reached = 0;
152
153             if(strategyCreated)
154                 broadcastStrategies();
155         }
156     }
157 }
158
159 //Has finished passing the strategy list to all the processors
160 void ComlibManager::barrier2(){
161     if(CkMyPe() == 0) {
162         b2count ++;
163         ComlibPrintf("In barrier2 %d\n", bcount);
164         if(b2count == CkNumPes()) {
165             b2count = 0; 
166             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
167             cgproxy.resumeFromBarrier2();
168         }
169     }
170 }
171
172 //Registers a set of strategies with the communication library
173 ComlibInstanceHandle ComlibManager::createInstance() {
174   
175     CkpvAccess(conv_com_ptr)->nstrats++;    
176     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
177                                CkpvAccess(cmgrID));  
178     return cinst;
179 }
180
181 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
182     
183     strategyCreated = true;
184
185     ListOfStrategies.enq(strat);
186     strat->setInstance(pos);
187 }
188
189 //End of registering function, if barriers have been reached send them over
190 void ComlibManager::broadcastStrategies() {
191     if(!barrierReached)
192       return;    
193
194     lbUpdateReceived = CmiFalse;
195     barrierReached = 0;
196
197     ComlibPrintf("Sending Strategies %d, %d\n", 
198                  CkpvAccess(conv_com_ptr)->nstrats, 
199                  ListOfStrategies.length());
200
201     StrategyWrapper sw;
202     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
203
204     if(ListOfStrategies.length() > 0) {
205         int len = ListOfStrategies.length();
206         sw.s_table = new Strategy* [len];
207         sw.nstrats = len;
208         
209         for (int count=0; count < len; count++)
210             sw.s_table[count] = ListOfStrategies.deq();
211     }
212     else {
213         sw.nstrats = 0;
214         sw.s_table = 0;
215     }
216
217     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
218     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
219 }
220
221 //Called when the array/group element starts sending messages
222 void ComlibManager::beginIteration(){
223     //right now does not do anything might need later
224     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
225     //prioEndIterationFlag = 0;
226 }
227
228 void ComlibManager::setInstance(int instID){
229
230     curStratID = instID;
231     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
232 }
233
234 //called when the array elements has finished sending messages
235 void ComlibManager::endIteration(){
236     //    prioEndIterationFlag = 1;
237     prevStratID = -1;
238     
239     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
240                  curStratID, 
241                  (* strategyTable)[curStratID].elementCount, 
242                  (* strategyTable)[curStratID].numElements);
243
244     if(isRemote) {
245         isRemote = 0;
246         sendRemote();
247         return;
248     }
249
250     if(!receivedTable) {
251         (* strategyTable)[curStratID].nEndItr++;
252         return;
253     }        
254     
255     (* strategyTable)[curStratID].elementCount++;
256     int count = 0;
257     
258     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
259         
260         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
261         
262         if(barrier2Reached) {       
263             (* strategyTable)[curStratID].strategy->doneInserting();
264         }
265         else (* strategyTable)[curStratID].call_doneInserting = 1;
266         
267         (* strategyTable)[curStratID].elementCount = 0;
268     }
269     ComlibPrintf("After EndIteration\n");
270 }
271
272 //receive the list of strategies
273 //Insert the strategies into the strategy table in converse comm lib.
274 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
275 void ComlibManager::receiveTable(StrategyWrapper &sw, 
276                                  CkHashtableT<ClibGlobalArrayIndex, int> 
277                                  &htable)
278 {
279
280     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
281                  clibIteration);
282
283     receivedTable = 1;
284
285     delete CkpvAccess(locationTable);
286     //Delete all records in it too !!!!!!!!!!
287
288     CkpvAccess(locationTable) =  NULL;
289
290     CkpvAccess(locationTable) = new 
291         CkHashtableT<ClibGlobalArrayIndex, int>;
292
293     CkHashtableIterator *ht_iterator = htable.iterator();
294     ht_iterator->seekStart();
295     while(ht_iterator->hasNext()){
296         ClibGlobalArrayIndex *idx;
297         int *pe;
298         pe = (int *)ht_iterator->next((void **)&idx);
299         
300         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
301                      idx->idx.data()[0], *pe);
302
303         CkpvAccess(locationTable)->put(*idx) = *pe;       
304     }
305     
306     //Reset cached array element index. Location table may have changed
307     CkpvAccess(cache_index).nInts = -1;
308     CkpvAccess(cache_aid).setZero();
309
310     CkArrayID st_aid;
311     int st_nelements;
312     CkArrayIndexMax *st_elem;
313     int temp_curstratid = curStratID;
314
315     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
316     clib_stats.setNstrats(sw.total_nstrats);
317
318     //First recreate strategies
319     int count = 0;
320     for(count = 0; count < sw.nstrats; count ++) {
321         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
322         
323         //set the instance to the current count
324         //currently all strategies are being copied to all processors
325         //later strategies will be selectively copied
326         
327         //location of this strategy table entry in the strategy table
328         int loc = cur_strategy->getInstance();
329         
330         if(loc >= MAX_NUM_STRATS)
331             CkAbort("Strategy table is full \n");
332
333         CharmStrategy *old_strategy;
334
335         //If this is a learning decision and the old strategy has to
336         //be gotten rid of, finalize it here.
337         if((old_strategy = 
338             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
339            != NULL) {
340             old_strategy->finalizeProcessing();
341             
342             //Unregister from array listener if array strategy
343             if(old_strategy->getType() == ARRAY_STRATEGY) {
344                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
345                 as.getSourceArray(st_aid, st_elem, st_nelements);
346
347                 (* strategyTable)[loc].numElements = 0;
348                 if(!st_aid.isZero()) {
349                     ComlibArrayListener *calistener = CkArrayID::
350                         CkLocalBranch(st_aid)->getComlibArrayListener();
351                     
352                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
353                 }
354             }
355         }
356         
357         //Insert strategy, frees an old strategy and sets the
358         //strategy_table entry to point to the new one
359         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
360         
361         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
362
363         if(cur_strategy->getType() == ARRAY_STRATEGY &&
364            cur_strategy->isBracketed()){ 
365
366             ComlibPrintf("Inserting Array Listener\n");
367             
368             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
369             as.getSourceArray(st_aid, st_elem, st_nelements);
370             
371             (* strategyTable)[loc].numElements = 0;
372             if(!st_aid.isZero()) {            
373                 ComlibArrayListener *calistener = 
374                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
375                 
376                 calistener->registerStrategy(&((* strategyTable)[loc]));
377             }
378         }                      
379         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
380           (* strategyTable)[loc].numElements = 1;
381         }
382         
383         (* strategyTable)[loc].elementCount = 0;
384         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
385     }
386
387     //Resume all end iterarions. Newer strategies may have more 
388     //or fewer elements to expect for!!
389     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
390         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
391                      CkMyPe(), count,
392                      (* strategyTable)[count].nEndItr);
393                          
394         curStratID = count;
395         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
396             endIteration();  
397         
398         (* strategyTable)[count].nEndItr = 0;        
399     }           
400     
401     curStratID = temp_curstratid;
402     ComlibPrintf("receivedTable %d\n", sw.nstrats);
403     
404     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
405     cgproxy[curComlibController].barrier2();
406 }
407
408 void ComlibManager::resumeFromBarrier2(){
409
410     if(!receivedTable) 
411         //Application called atsync inbetween, receiveTable 
412         //and resumeFromBarrier2. This will only happen when there
413         //is no element on this processor and an element arrived, 
414         //leading to a resumeFromSync and hence an AtSync in comlib.
415         return; //A new resumeFromBarrier2 is on its way
416     
417     setupComplete = 1;
418
419     barrier2Reached = 1;
420     barrierReached = 0;
421
422     clibIteration ++;
423
424     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", 
425                  CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
426
427     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
428          count ++) {
429         if (!(* strategyTable)[count].tmplist.isEmpty()) {
430             
431             while (!(* strategyTable)[count].tmplist.isEmpty()) {
432                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
433                     (* strategyTable)[count].tmplist.deq();
434                 
435                 if((*strategyTable)[count].strategy->getType() == 
436                    ARRAY_STRATEGY) {
437                     if(cmsg->dest_proc >= 0) {
438                         envelope *env  = UsrToEnv(cmsg->getCharmMessage());
439                         cmsg->dest_proc = getLastKnown
440                             (env->getsetArrayMgr(),
441                              env->getsetArrayIndex());
442                     }
443                     //else multicast
444                 }                                
445                 
446                 if(cmsg->dest_proc == CkMyPe()) {
447                     CmiSyncSendAndFree(CkMyPe(), cmsg->size,
448                                        (char *)
449                                        UsrToEnv(cmsg->getCharmMessage()));
450                     delete cmsg;
451                 }
452                 else
453                     (* strategyTable)[count].strategy->insertMessage(cmsg);
454             }
455         }
456         
457         if ((* strategyTable)[count].call_doneInserting) {
458             (* strategyTable)[count].call_doneInserting = 0;
459             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
460             (* strategyTable)[count].strategy->doneInserting();
461         }
462     }
463     
464     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
465 }
466
467 extern int _charmHandlerIdx;
468
469 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
470                               const CkArrayIndexMax &idx, CkArrayID a){
471     
472     if(pd != NULL) {
473         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
474         setInstance(ci->_instid);
475     }
476     
477     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
478     
479     CkArrayIndexMax myidx = idx;
480     int dest_proc = getLastKnown(a, myidx); 
481     
482     //Reading from two hash tables is a big overhead
483     //int amgr_destpe = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
484     
485     register envelope * env = UsrToEnv(msg);
486     
487     env->getsetArrayMgr()=a;
488     env->getsetArraySrcPe()=CkMyPe();
489     env->getsetArrayEp()=ep;
490     env->getsetArrayHops()=0;
491     env->getsetArrayIndex()=idx;
492     env->setUsed(0);
493     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
494
495     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
496
497     if(isRemote) {
498         CkPackMessage(&env);        
499         CharmMessageHolder *cmsg = new 
500             CharmMessageHolder((char *)msg, dest_proc);
501         
502         remoteQ.enq(cmsg);
503         return;
504     }
505     
506     //With migration some array messages may be directly sent Also no
507     //message processing should happen before the comlib barriers have
508     //gone through
509     if(dest_proc == CkMyPe() && setupComplete){  
510         CkArray *amgr = (CkArray *)_localBranch(a);
511         amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
512         
513         return;
514     }
515
516     CkPackMessage(&env);
517     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
518     
519     //get rid of the new.
520     CharmMessageHolder *cmsg = new 
521         CharmMessageHolder((char *)msg, dest_proc);
522     
523     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, setupComplete);
524     
525     if (setupComplete)        
526         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
527     else 
528         (* strategyTable)[curStratID].tmplist.enq(cmsg);
529     
530     //CmiPrintf("After Insert\n");
531 }
532
533
534 #include "qd.h"
535 //CkpvExtern(QdState*, _qd);
536
537 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
538     
539
540     if(pd != NULL) {
541         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
542         setInstance(ci->_instid);
543     }
544
545     int dest_proc = onPE;
546
547     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
548                  UsrToEnv(msg)->getTotalsize());
549
550     register envelope * env = UsrToEnv(msg);
551     if(dest_proc == CkMyPe() && setupComplete){
552         _SET_USED(env, 0);
553         CkSendMsgBranch(ep, msg, dest_proc, gid);
554         return;
555     }
556     
557     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
558     CpvAccess(_qd)->create(1);
559
560     env->setMsgtype(ForBocMsg);
561     env->setEpIdx(ep);
562     env->setGroupNum(gid);
563     env->setSrcPe(CkMyPe());
564     env->setUsed(0);
565
566     CkPackMessage(&env);
567     CmiSetHandler(env, _charmHandlerIdx);
568
569     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
570     //get rid of the new.
571     
572     if(setupComplete)
573         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
574     else {
575         (* strategyTable)[curStratID].tmplist.enq(cmsg);
576     }
577 }
578
579 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
580     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
581
582     if(pd != NULL) {
583         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
584         setInstance(ci->_instid);
585     }
586     
587     //Broken, add the processor list here.
588
589     register envelope * env = UsrToEnv(m);
590     env->getsetArrayMgr()=a;
591     env->getsetArraySrcPe()=CkMyPe();
592     env->getsetArrayEp()=ep;
593     env->getsetArrayHops()=0;
594     env->getsetArrayIndex()= dummyArrayIndex;
595     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
596
597     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
598     
599     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
600
601     CharmMessageHolder *cmsg = new 
602         CharmMessageHolder((char *)m, IS_BROADCAST);
603     cmsg->npes = 0;
604     cmsg->pelist = NULL;
605     cmsg->sec_id = NULL;
606
607     multicast(cmsg);
608 }
609
610 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
611                                      CkArrayID a, CkSectionID &s, int opts) {
612
613 #ifndef CMK_OPTIMIZE
614     traceUserEvent(section_send_event);
615 #endif
616
617     if(pd != NULL) {
618         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
619         setInstance(ci->_instid);
620     }
621     
622     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
623
624     register envelope * env = UsrToEnv(m);
625     env->getsetArrayMgr()=a;
626     env->getsetArraySrcPe()=CkMyPe();
627     env->getsetArrayEp()=ep;
628     env->getsetArrayHops()=0;
629     env->getsetArrayIndex()= dummyArrayIndex;
630     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
631
632     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
633     
634     env->setUsed(0);    
635     CkPackMessage(&env);
636     
637     //Provide a dummy dest proc as it does not matter for mulitcast 
638     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
639                                                       IS_SECTION_MULTICAST);
640     cmsg->npes = 0;
641     cmsg->sec_id = &s;
642
643     CkSectionInfo minfo;
644     minfo.type = COMLIB_MULTICAST_MESSAGE;
645     minfo.sInfo.cInfo.instId = curStratID;
646     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
647     minfo.sInfo.cInfo.id = 0; 
648     minfo.pe = CkMyPe();
649     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
650     //    s.npes = 0;
651     //s.pelist = NULL;
652
653     multicast(cmsg);
654 }
655
656 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
657     
658     if(pd != NULL) {
659         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
660         setInstance(ci->_instid);
661     }
662     
663     register envelope * env = UsrToEnv(m);
664     
665     CpvAccess(_qd)->create(1);
666
667     env->setMsgtype(ForBocMsg);
668     env->setEpIdx(ep);
669     env->setGroupNum(g);
670     env->setSrcPe(CkMyPe());
671     env->setUsed(0);
672     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
673
674     CkPackMessage(&env);
675     CmiSetHandler(env, _charmHandlerIdx);
676     
677     //Provide a dummy dest proc as it does not matter for mulitcast 
678     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
679     
680     cmsg->npes = 0;
681     cmsg->pelist = NULL;
682
683     multicast(cmsg);
684 }
685
686 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
687
688     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
689     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
690
691     env->setUsed(0);    
692     CkPackMessage(&env);
693
694     //Will be used to detect multicast message for learning
695     
696     if (setupComplete)
697         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
698     else {
699         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
700         (* strategyTable)[curStratID].tmplist.enq(cmsg);
701     }
702
703     ComlibPrintf("After multicast\n");
704 }
705
706 //Collect statistics from all the processors, also gets the list of
707 //array elements on each processor.
708 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
709                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
710     
711     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
712
713     numStatsReceived ++;
714     clib_gstats.updateStats(stat, pe);
715     
716     for(int count = 0; count < idx_vec.length(); count++) {
717         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
718         
719         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
720                      pe);
721         
722         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
723     }        
724
725     //Reset cached array element index. Location table may have changed
726     CkpvAccess(cache_index).nInts = -1;
727     CkpvAccess(cache_aid).setZero();
728
729     if(numStatsReceived == CkNumPes()) {
730         numStatsReceived = 0;
731
732         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
733             count++ ){
734             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
735             if(strat->getType() > CONVERSE_STRATEGY) {
736                 CharmStrategy *cstrat = (CharmStrategy *)strat;
737                 ComlibLearner *learner = cstrat->getLearner();
738                 CharmStrategy *newstrat = NULL;
739                                 
740                 if(learner != NULL) {
741                     ComlibPrintf("Calling Learner\n");
742                     newstrat = (CharmStrategy *)learner->optimizePattern
743                         (strat, clib_gstats);
744                     
745                     if(newstrat != NULL)
746                         ListOfStrategies.enq(newstrat);
747                     else
748                         ListOfStrategies.enq(cstrat);
749                 }
750             }
751         }
752         barrierReached = 1;
753         
754         //if(lbUpdateReceived) {
755         //lbUpdateReceived = CmiFalse;
756         broadcastStrategies();
757         //}
758     }
759 }
760
761 void ComlibManager::setRemote(int remote_pe){
762
763     ComlibPrintf("Setting remote flag on\n");
764
765     remotePe = remote_pe;
766     isRemote = 1;
767 }
768
769
770 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
771                                       int strat_id) {
772     setInstance(strat_id);
773     
774     int nmsgs = rq.length();
775
776     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
777
778     for(int count = 0; count < nmsgs; count++) {
779         char *msg = rq.deq()->getCharmMessage();
780         envelope *env = UsrToEnv(msg);
781         
782         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
783                   env->getsetArrayMgr());
784     }
785
786     endIteration();
787 }
788
789 void ComlibManager::sendRemote(){
790     
791     int nmsgs = remoteQ.length();
792
793     //if(nmsgs == 0)
794     //  return;
795
796     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
797
798     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
799     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
800     
801     for(int count = 0; count < nmsgs; count++) {
802         CharmMessageHolder *cmsg = remoteQ.deq();
803         CkFreeMsg(cmsg->getCharmMessage());
804         delete cmsg;
805     }
806 }
807
808
809 void ComlibManager::AtSync() {
810
811     //comm_debug = 1;
812     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
813
814     barrier2Reached = 0;
815     receivedTable = 0;
816     setupComplete = 0;
817     barrierReached = 0;
818     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
819
820     int pos = 0;
821
822     CkVec<ClibGlobalArrayIndex> gidx_vec;
823
824     CkVec<CkArrayID> tmp_vec;
825     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
826         count ++) {
827         if((* strategyTable)[count].strategy->getType() 
828            == ARRAY_STRATEGY) {
829             CharmStrategy *cstrat = (CharmStrategy*)
830                 ((* strategyTable)[count].strategy);
831             
832             CkArrayID src, dest;
833             CkArrayIndexMax *elements;
834             int nelem;
835             
836             cstrat->ainfo.getSourceArray(src, elements, nelem);
837             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
838
839             CmiBool srcflag = CmiFalse;
840             CmiBool destflag = CmiFalse;
841             
842             if(src == dest || dest.isZero())
843                 destflag = CmiTrue;
844
845             if(src.isZero())
846                 srcflag = CmiTrue;                        
847
848             for(pos = 0; pos < tmp_vec.size(); pos++) {
849                 if(tmp_vec[pos] == src)
850                     srcflag = CmiTrue;
851
852                 if(tmp_vec[pos] == dest)
853                     destflag = CmiTrue;
854
855                 if(srcflag && destflag)
856                     break;
857             }
858
859             if(!srcflag)
860                 tmp_vec.insertAtEnd(src);
861
862             if(!destflag)
863                 tmp_vec.insertAtEnd(dest);
864         }
865         
866         //cant do it here, done in receiveTable
867         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
868         //  (* strategyTable)[count].reset();
869     }
870
871     for(pos = 0; pos < tmp_vec.size(); pos++) {
872         CkArrayID aid = tmp_vec[pos];
873
874         ComlibArrayListener *calistener = 
875             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
876
877         CkVec<CkArrayIndexMax> idx_vec;
878         calistener->getLocalIndices(idx_vec);
879
880         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
881             ClibGlobalArrayIndex gindex;
882             gindex.aid = aid;
883             gindex.idx = idx_vec[idx_count];
884
885             gidx_vec.insertAtEnd(gindex);
886         }
887     }
888
889     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
890     clib_stats.reset();
891 }
892
893 #include "lbdb.h"
894 #include "CentralLB.h"
895
896 /******** FOO BAR : NEEDS to be consistent with array manager *******/
897 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
898     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
899         CkAbort("LDB OBJ ID smaller than array index\n");
900     
901     //values higher than CkArrayIndexMax should be 0
902     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
903         idx.data()[count] = ld_id.id[count];
904     }
905     idx.nInts = 1;
906 }
907
908 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
909     for(int count = 0; count < msg->n_moves; count ++) {
910         MigrateInfo m = msg->moves[count];
911
912         CkArrayID aid; CkArrayIndexMax idx;
913         aid = CkArrayID(m.obj.omhandle.id.id);
914         LDObjID2IdxMax(m.obj.id, idx);
915
916         ClibGlobalArrayIndex cid; 
917         cid.aid = aid;
918         cid.idx = idx;
919         
920         int pe = CkpvAccess(locationTable)->get(cid);
921
922         //Value exists in the table, so update it
923         if(pe != 0) {
924             pe = m.to_pe + CkNumPes();
925             CkpvAccess(locationTable)->getRef(cid) = pe;
926         }
927         //otherwise we dont care about these objects
928     }   
929
930     lbUpdateReceived = CmiTrue;
931     if(barrierReached) {
932         broadcastStrategies();
933         barrierReached = 0;
934     }
935
936     CkFreeMsg(msg);
937 }
938
939 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
940     ComlibInstanceHandle *inst = new ComlibInstanceHandle
941         (*((ComlibInstanceHandle *)data));
942     return inst;
943 }
944
945
946 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
947                                                    CkDelegateData *pd) {
948
949     if(pd == NULL)
950         return NULL;
951
952     CmiBool to_pup = CmiFalse;
953
954     ComlibInstanceHandle *inst; 
955     if(!p.isUnpacking()) {
956         inst = (ComlibInstanceHandle *) pd;       
957         if(pd != NULL)
958             to_pup = CmiTrue;
959     }
960     else 
961         //Call migrate constructor
962         inst = new ComlibInstanceHandle();
963
964     p | to_pup;
965
966     if(to_pup)
967         inst->pup(p);
968     return inst;
969 }    
970
971 void ComlibDelegateProxy(CProxy *proxy){
972     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
973     proxy->ckDelegate(cgproxy.ckLocalBranch(), NULL);
974 }
975
976 void ComlibAssociateProxy(ComlibInstanceHandle *cinst, CProxy &proxy) {
977     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
978     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
979 }
980
981 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
982     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
983         (CkGetComlibInstance());
984
985     cinst->setStrategy(strat);
986     
987     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
988     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
989
990
991 ComlibInstanceHandle ComlibRegister(CharmStrategy *strat) {
992     ComlibInstanceHandle cinst(CkGetComlibInstance());
993     cinst.setStrategy(strat);
994     return cinst;
995 }
996
997 void ComlibBegin(CProxy &proxy) {
998     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
999     cinst->beginIteration();
1000
1001
1002 void ComlibEnd(CProxy &proxy) {
1003     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
1004     cinst->endIteration();
1005 }
1006
1007 ComlibInstanceHandle CkCreateComlibInstance(){
1008     return CkGetComlibInstance();
1009 }
1010
1011 ComlibInstanceHandle CkGetComlibInstance() {
1012     if(CkMyPe() != 0)
1013         CkAbort("Comlib Instance can only be created on Processor 0");
1014     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
1015     return (cgproxy.ckLocalBranch())->createInstance();
1016 }
1017
1018 ComlibInstanceHandle CkGetComlibInstance(int id) {
1019     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
1020     return cinst;
1021 }
1022
1023 void ComlibDoneCreating(){
1024     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1025     (cgproxy.ckLocalBranch())->broadcastStrategies();
1026 }
1027
1028 char *router;
1029 int sfactor=0;
1030
1031 class ComlibManagerMain {
1032 public:
1033     ComlibManagerMain(CkArgMsg *msg) {
1034         
1035         if(CkMyPe() == 0 && msg !=  NULL)
1036             CmiGetArgString(msg->argv, "+strategy", &router);         
1037
1038         if(CkMyPe() == 0 && msg !=  NULL)
1039             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1040         
1041         CProxy_ComlibManager::ckNew();
1042     }
1043 };
1044
1045 //Called by user code
1046 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1047     _instid = -1;
1048     _dmid.setZero();
1049     _srcPe = -1;
1050     toForward = 0;
1051 }
1052
1053 //Called by user code
1054 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1055     : CkDelegateData() {
1056     _instid = h._instid;
1057     _dmid = h._dmid;
1058     toForward = h.toForward;        
1059
1060     ComlibPrintf("In Copy Constructor\n");
1061     _srcPe = h._srcPe;
1062
1063     reset();
1064     ref();
1065 }
1066
1067 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1068     _instid = h._instid;
1069     _dmid = h._dmid;
1070     toForward = h.toForward;
1071     _srcPe = h._srcPe;
1072     
1073     reset();
1074     ref();
1075     return *this;
1076 }
1077
1078 //Called by the communication library
1079 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1080     _instid = instid;
1081     _dmid   = dmid;
1082     _srcPe  = -1;
1083     toForward = 0;
1084 }
1085
1086 void ComlibInstanceHandle::beginIteration() { 
1087     CProxy_ComlibManager cgproxy(_dmid);
1088
1089     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1090
1091     //User forgot to make the instance handle a readonly or pass it
1092     //into the constructor of an array and is using it directly from
1093     //Main :: main
1094     if(_srcPe == -1) {
1095         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1096         _srcPe = CkMyPe();
1097     }
1098
1099     if(_srcPe != CkMyPe() && toForward) {
1100         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1101     }
1102
1103     (cgproxy.ckLocalBranch())->setInstance(_instid);
1104     (cgproxy.ckLocalBranch())->beginIteration();   
1105 }
1106
1107 void ComlibInstanceHandle::endIteration() {
1108     CProxy_ComlibManager cgproxy(_dmid);
1109     (cgproxy.ckLocalBranch())->endIteration();
1110 }
1111
1112 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1113     toForward = s->getForwardOnMigration();
1114     CProxy_ComlibManager cgproxy(_dmid);
1115     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1116 }
1117
1118 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1119     if(_instid < 0) 
1120         return NULL;    
1121     
1122     CProxy_ComlibManager cgproxy(_dmid);
1123     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1124 }
1125
1126 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1127
1128 void ComlibInitSectionID(CkSectionID &sid){
1129     
1130     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1131     sid._cookie.pe = CkMyPe();
1132     
1133     sid._cookie.sInfo.cInfo.id = 0;    
1134     sid.npes = 0;
1135     sid.pelist = NULL;
1136 }
1137
1138 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1139     CkSectionID &sid = sproxy->ckGetSectionID();
1140     ComlibInitSectionID(sid);
1141     sid._cookie.sInfo.cInfo.status = 0;
1142 }
1143
1144 // for backward compatibility - for old name commlib
1145 void _registercommlib(void)
1146 {
1147   static int _done = 0; 
1148   if(_done) 
1149       return; 
1150   _done = 1;
1151   _registercomlib();
1152 }
1153
1154 void ComlibAtSyncHandler(void *msg) {
1155     CmiFree(msg);
1156     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1157     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1158     if(cmgr_ptr)
1159         cmgr_ptr->AtSync();    
1160 }
1161
1162 void ComlibNotifyMigrationDoneHandler(void *msg) {
1163     CmiFree(msg);
1164     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1165     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1166     if(cmgr_ptr)
1167         cmgr_ptr->AtSync();    
1168 }
1169
1170
1171 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1172     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1173     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1174 }
1175
1176 #include "comlib.def.h"
1177
1178 /*@}*/