4d54cd14588d9009a10a6c6cd06ff555ddc7443d
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "MultiRingMulticast.h"
12 #include "PipeBroadcastStrategy.h"
13 #include "BroadcastStrategy.h"
14 #include "MeshStreamingStrategy.h"
15 #include "PrioStreaming.h"
16
17 CkpvExtern(int, RecvdummyHandle);
18
19 CkpvDeclare(int, RecvmsgHandle);
20 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
21 CkpvDeclare(CkGroupID, cmgrID);
22 CkpvExtern(ConvComlibManager *, conv_com_ptr);
23
24 //Handler to receive array messages
25 void recv_array_msg(void *msg){
26
27     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
28
29     if(msg == NULL)
30         return;
31     
32     register envelope* env = (envelope *)msg;
33     env->setUsed(0);
34     env->getsetArrayHops()=1;
35     CkUnpackMessage(&env);
36
37     int srcPe = env->getSrcPe();
38     int sid = ((CmiMsgHeaderExt *) env)->stratid;
39
40     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
41              sid, env->getTotalsize(), srcPe);
42
43     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
44     
45     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
46     //if(!comm_debug)
47     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
48
49     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
50     return;
51 }
52
53 void recv_combined_array_msg(void *msg){
54     if(msg == NULL)
55         return;
56     
57     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
58
59     MsgPacker::deliver((CombinedMessage *)msg);
60 }
61
62 ComlibManager::ComlibManager(){
63     init();
64     ComlibPrintf("In comlibmanager constructor\n");
65 }
66
67 void ComlibManager::init(){
68     
69     initComlibManager();
70
71     PUPable_reg(CharmStrategy);
72     PUPable_reg(CharmMessageHolder);
73     
74     //comm_debug = 1;
75     
76     numStatsReceived = 0;
77     curComlibController = 0;
78     clibIteration = 0;
79     
80     strategyCreated = CmiFalse;
81
82     CkpvInitialize(ClibLocationTableType*, locationTable);
83     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
84
85     CkpvInitialize(CkArrayIndexMax, cache_index);
86     CkpvInitialize(int, cache_pe);
87     CkpvInitialize(CkArrayID, cache_aid);
88
89     CkpvAccess(cache_index).nInts = -1;
90     CkpvAccess(cache_aid).setZero();
91
92     CkpvInitialize(int, RecvmsgHandle);
93     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
94
95     bcast_pelist = new int [CkNumPes()];
96     for(int brcount = 0; brcount < CkNumPes(); brcount++)
97         bcast_pelist[brcount] = brcount;
98
99     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
100     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
101         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
102     
103     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
104     
105     npes = CkNumPes();
106     pelist = NULL;
107
108     CkpvInitialize(CkGroupID, cmgrID);
109     CkpvAccess(cmgrID) = thisgroup;
110
111     dummyArrayIndex.nInts = 0;
112
113     curStratID = 0;
114     prevStratID = -1;
115     //prioEndIterationFlag = 1;
116
117     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
118     
119     receivedTable = 0;
120     setupComplete = 0;
121
122     barrierReached = 0;
123     barrier2Reached = 0;
124
125     bcount = b2count = 0;
126     lbUpdateReceived = CmiFalse;
127
128     isRemote = 0;
129     remotePe = -1;
130
131     CkpvInitialize(int, migrationDoneHandlerID);
132     CkpvAccess(migrationDoneHandlerID) = 
133         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
134     
135     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
136     cgproxy[curComlibController].barrier();
137 }
138
139 //First barrier makes sure that the communication library group 
140 //has been created on all processors
141 void ComlibManager::barrier(){
142     ComlibPrintf("In barrier %d\n", bcount);
143     if(CkMyPe() == 0) {
144         bcount ++;
145         if(bcount == CkNumPes()){
146             bcount = 0;
147             barrierReached = 1;
148             barrier2Reached = 0;
149
150             if(strategyCreated)
151                 broadcastStrategies();
152         }
153     }
154 }
155
156 //Has finished passing the strategy list to all the processors
157 void ComlibManager::barrier2(){
158     if(CkMyPe() == 0) {
159         b2count ++;
160         ComlibPrintf("In barrier2 %d\n", bcount);
161         if(b2count == CkNumPes()) {
162             b2count = 0; 
163             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
164             cgproxy.resumeFromBarrier2();
165         }
166     }
167 }
168
169 //Registers a set of strategies with the communication library
170 ComlibInstanceHandle ComlibManager::createInstance() {
171   
172     CkpvAccess(conv_com_ptr)->nstrats++;    
173     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
174                                CkpvAccess(cmgrID));  
175     return cinst;
176 }
177
178 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
179     
180     strategyCreated = true;
181
182     ListOfStrategies.enq(strat);
183     strat->setInstance(pos);
184 }
185
186 //End of registering function, if barriers have been reached send them over
187 void ComlibManager::broadcastStrategies() {
188     if(!barrierReached)
189       return;    
190
191     lbUpdateReceived = CmiFalse;
192     barrierReached = 0;
193
194     ComlibPrintf("Sending Strategies %d, %d\n", 
195                  CkpvAccess(conv_com_ptr)->nstrats, 
196                  ListOfStrategies.length());
197
198     StrategyWrapper sw;
199     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
200
201     if(ListOfStrategies.length() > 0) {
202         int len = ListOfStrategies.length();
203         sw.s_table = new Strategy* [len];
204         sw.nstrats = len;
205         
206         for (int count=0; count < len; count++)
207             sw.s_table[count] = ListOfStrategies.deq();
208     }
209     else {
210         sw.nstrats = 0;
211         sw.s_table = 0;
212     }
213
214     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
215     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
216 }
217
218 //Called when the array/group element starts sending messages
219 void ComlibManager::beginIteration(){
220     //right now does not do anything might need later
221     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
222     //prioEndIterationFlag = 0;
223 }
224
225 void ComlibManager::setInstance(int instID){
226
227     curStratID = instID;
228     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
229 }
230
231 //called when the array elements has finished sending messages
232 void ComlibManager::endIteration(){
233     //    prioEndIterationFlag = 1;
234     prevStratID = -1;
235     
236     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
237                  curStratID, 
238                  (* strategyTable)[curStratID].elementCount, 
239                  (* strategyTable)[curStratID].numElements);
240
241     if(isRemote) {
242         isRemote = 0;
243         sendRemote();
244         return;
245     }
246
247     if(!receivedTable) {
248         (* strategyTable)[curStratID].nEndItr++;
249         return;
250     }        
251     
252     (* strategyTable)[curStratID].elementCount++;
253     int count = 0;
254     
255     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
256         
257         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
258         
259         if(barrier2Reached) {       
260             (* strategyTable)[curStratID].strategy->doneInserting();
261         }
262         else (* strategyTable)[curStratID].call_doneInserting = 1;
263         
264         (* strategyTable)[curStratID].elementCount = 0;
265     }
266     ComlibPrintf("After EndIteration\n");
267 }
268
269 //receive the list of strategies
270 //Insert the strategies into the strategy table in converse comm lib.
271 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
272 void ComlibManager::receiveTable(StrategyWrapper &sw, 
273                                  CkHashtableT<ClibGlobalArrayIndex, int> 
274                                  &htable)
275 {
276
277     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
278                  clibIteration);
279
280     receivedTable = 1;
281
282     delete CkpvAccess(locationTable);
283     //Delete all records in it too !!!!!!!!!!
284
285     CkpvAccess(locationTable) =  NULL;
286
287     CkpvAccess(locationTable) = new 
288         CkHashtableT<ClibGlobalArrayIndex, int>;
289
290     CkHashtableIterator *ht_iterator = htable.iterator();
291     ht_iterator->seekStart();
292     while(ht_iterator->hasNext()){
293         ClibGlobalArrayIndex *idx;
294         int *pe;
295         pe = (int *)ht_iterator->next((void **)&idx);
296         
297         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
298                      idx->idx.data()[0], *pe);
299
300         CkpvAccess(locationTable)->put(*idx) = *pe;       
301     }
302     
303     //Reset cached array element index. Location table may have changed
304     CkpvAccess(cache_index).nInts = -1;
305     CkpvAccess(cache_aid).setZero();
306
307     CkArrayID st_aid;
308     int st_nelements;
309     CkArrayIndexMax *st_elem;
310     int temp_curstratid = curStratID;
311
312     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
313     clib_stats.setNstrats(sw.total_nstrats);
314
315     //First recreate strategies
316     int count = 0;
317     for(count = 0; count < sw.nstrats; count ++) {
318         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
319         
320         //set the instance to the current count
321         //currently all strategies are being copied to all processors
322         //later strategies will be selectively copied
323         
324         //location of this strategy table entry in the strategy table
325         int loc = cur_strategy->getInstance();
326         
327         if(loc >= MAX_NUM_STRATS)
328             CkAbort("Strategy table is full \n");
329
330         CharmStrategy *old_strategy;
331
332         //If this is a learning decision and the old strategy has to
333         //be gotten rid of, finalize it here.
334         if((old_strategy = 
335             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
336            != NULL) {
337             old_strategy->finalizeProcessing();
338             
339             //Unregister from array listener if array strategy
340             if(old_strategy->getType() == ARRAY_STRATEGY) {
341                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
342                 as.getSourceArray(st_aid, st_elem, st_nelements);
343
344                 (* strategyTable)[loc].numElements = 0;
345                 if(!st_aid.isZero()) {
346                     ComlibArrayListener *calistener = CkArrayID::
347                         CkLocalBranch(st_aid)->getComlibArrayListener();
348                     
349                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
350                 }
351             }
352         }
353         
354         //Insert strategy, frees an old strategy and sets the
355         //strategy_table entry to point to the new one
356         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
357         
358         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
359
360         if(cur_strategy->getType() == ARRAY_STRATEGY &&
361            cur_strategy->isBracketed()){ 
362
363             ComlibPrintf("Inserting Array Listener\n");
364             
365             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
366             as.getSourceArray(st_aid, st_elem, st_nelements);
367             
368             (* strategyTable)[loc].numElements = 0;
369             if(!st_aid.isZero()) {            
370                 ComlibArrayListener *calistener = 
371                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
372                 
373                 calistener->registerStrategy(&((* strategyTable)[loc]));
374             }
375         }                      
376         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
377           (* strategyTable)[loc].numElements = 1;
378         }
379         
380         (* strategyTable)[loc].elementCount = 0;
381         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
382     }
383
384     //Resume all end iterarions. Newer strategies may have more 
385     //or fewer elements to expect for!!
386     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
387         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
388                      CkMyPe(), count,
389                      (* strategyTable)[count].nEndItr);
390                          
391         curStratID = count;
392         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
393             endIteration();  
394         
395         (* strategyTable)[count].nEndItr = 0;        
396     }           
397     
398     curStratID = temp_curstratid;
399     ComlibPrintf("receivedTable %d\n", sw.nstrats);
400     
401     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
402     cgproxy[curComlibController].barrier2();
403 }
404
405 void ComlibManager::resumeFromBarrier2(){
406
407     if(!receivedTable) 
408         //Application called atsync inbetween, receiveTable 
409         //and resumeFromBarrier2. This will only happen when there
410         //is no element on this processor and an element arrived, 
411         //leading to a resumeFromSync and hence an AtSync in comlib.
412         return; //A new resumeFromBarrier2 is on its way
413     
414     setupComplete = 1;
415
416     barrier2Reached = 1;
417     barrierReached = 0;
418
419     clibIteration ++;
420
421     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", 
422                  CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
423
424     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
425          count ++) {
426         if (!(* strategyTable)[count].tmplist.isEmpty()) {
427             
428             while (!(* strategyTable)[count].tmplist.isEmpty()) {
429                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
430                     (* strategyTable)[count].tmplist.deq();
431                 
432                 if((*strategyTable)[count].strategy->getType() == 
433                    ARRAY_STRATEGY) {
434                     if(cmsg->dest_proc >= 0) {
435                         envelope *env  = UsrToEnv(cmsg->getCharmMessage());
436                         cmsg->dest_proc = getLastKnown
437                             (env->getsetArrayMgr(),
438                              env->getsetArrayIndex());
439                     }
440                     //else multicast
441                 }                                
442                 
443                 if(cmsg->dest_proc == CkMyPe()) {
444                     CmiSyncSendAndFree(CkMyPe(), cmsg->size,
445                                        (char *)
446                                        UsrToEnv(cmsg->getCharmMessage()));
447                     delete cmsg;
448                 }
449                 else
450                     (* strategyTable)[count].strategy->insertMessage(cmsg);
451             }
452         }
453         
454         if ((* strategyTable)[count].call_doneInserting) {
455             (* strategyTable)[count].call_doneInserting = 0;
456             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
457             (* strategyTable)[count].strategy->doneInserting();
458         }
459     }
460     
461     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
462 }
463
464 extern int _charmHandlerIdx;
465
466 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
467                               const CkArrayIndexMax &idx, CkArrayID a){
468     
469     if(pd != NULL) {
470         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
471         setInstance(ci->_instid);
472     }
473     
474     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
475     
476     CkArrayIndexMax myidx = idx;
477     int dest_proc = getLastKnown(a, myidx); 
478     
479     //Reading from two hash tables is a big overhead
480     //int amgr_destpe = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
481     
482     register envelope * env = UsrToEnv(msg);
483     
484     env->getsetArrayMgr()=a;
485     env->getsetArraySrcPe()=CkMyPe();
486     env->getsetArrayEp()=ep;
487     env->getsetArrayHops()=0;
488     env->getsetArrayIndex()=idx;
489     env->setUsed(0);
490     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
491
492     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
493
494     if(isRemote) {
495         CkPackMessage(&env);        
496         CharmMessageHolder *cmsg = new 
497             CharmMessageHolder((char *)msg, dest_proc);
498         
499         remoteQ.enq(cmsg);
500         return;
501     }
502     
503     //With migration some array messages may be directly sent Also no
504     //message processing should happen before the comlib barriers have
505     //gone through
506     if(dest_proc == CkMyPe() && setupComplete){  
507         CkArray *amgr = (CkArray *)_localBranch(a);
508         amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
509         
510         return;
511     }
512
513     CkPackMessage(&env);
514     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
515     
516     //get rid of the new.
517     CharmMessageHolder *cmsg = new 
518         CharmMessageHolder((char *)msg, dest_proc);
519     
520     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, setupComplete);
521     
522     if (setupComplete)        
523         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
524     else 
525         (* strategyTable)[curStratID].tmplist.enq(cmsg);
526     
527     //CmiPrintf("After Insert\n");
528 }
529
530
531 #include "qd.h"
532 //CkpvExtern(QdState*, _qd);
533
534 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
535     
536
537     if(pd != NULL) {
538         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
539         setInstance(ci->_instid);
540     }
541
542     int dest_proc = onPE;
543
544     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
545                  UsrToEnv(msg)->getTotalsize());
546
547     register envelope * env = UsrToEnv(msg);
548     if(dest_proc == CkMyPe() && setupComplete){
549         _SET_USED(env, 0);
550         CkSendMsgBranch(ep, msg, dest_proc, gid);
551         return;
552     }
553     
554     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
555     CpvAccess(_qd)->create(1);
556
557     env->setMsgtype(ForBocMsg);
558     env->setEpIdx(ep);
559     env->setGroupNum(gid);
560     env->setSrcPe(CkMyPe());
561     env->setUsed(0);
562
563     CkPackMessage(&env);
564     CmiSetHandler(env, _charmHandlerIdx);
565
566     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
567     //get rid of the new.
568     
569     if(setupComplete)
570         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
571     else {
572         (* strategyTable)[curStratID].tmplist.enq(cmsg);
573     }
574 }
575
576 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
577     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
578
579     if(pd != NULL) {
580         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
581         setInstance(ci->_instid);
582     }
583     
584     //Broken, add the processor list here.
585
586     register envelope * env = UsrToEnv(m);
587     env->getsetArrayMgr()=a;
588     env->getsetArraySrcPe()=CkMyPe();
589     env->getsetArrayEp()=ep;
590     env->getsetArrayHops()=0;
591     env->getsetArrayIndex()= dummyArrayIndex;
592     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
593
594     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
595     
596     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
597
598     CharmMessageHolder *cmsg = new 
599         CharmMessageHolder((char *)m, IS_BROADCAST);
600     cmsg->npes = 0;
601     cmsg->pelist = NULL;
602     cmsg->sec_id = NULL;
603
604     multicast(cmsg);
605 }
606
607 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
608                                      CkArrayID a, CkSectionID &s, int opts) {
609
610 #ifndef CMK_OPTIMIZE
611     traceUserEvent(section_send_event);
612 #endif
613
614     if(pd != NULL) {
615         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
616         setInstance(ci->_instid);
617     }
618     
619     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
620
621     register envelope * env = UsrToEnv(m);
622     env->getsetArrayMgr()=a;
623     env->getsetArraySrcPe()=CkMyPe();
624     env->getsetArrayEp()=ep;
625     env->getsetArrayHops()=0;
626     env->getsetArrayIndex()= dummyArrayIndex;
627     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
628
629     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
630     
631     env->setUsed(0);    
632     CkPackMessage(&env);
633     
634     //Provide a dummy dest proc as it does not matter for mulitcast 
635     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
636                                                       IS_SECTION_MULTICAST);
637     cmsg->npes = 0;
638     cmsg->sec_id = &s;
639
640     CkSectionInfo minfo;
641     minfo.type = COMLIB_MULTICAST_MESSAGE;
642     minfo.sInfo.cInfo.instId = curStratID;
643     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
644     minfo.sInfo.cInfo.id = 0; 
645     minfo.pe = CkMyPe();
646     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
647     //    s.npes = 0;
648     //s.pelist = NULL;
649
650     multicast(cmsg);
651 }
652
653 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
654     
655     if(pd != NULL) {
656         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
657         setInstance(ci->_instid);
658     }
659     
660     register envelope * env = UsrToEnv(m);
661     
662     CpvAccess(_qd)->create(1);
663
664     env->setMsgtype(ForBocMsg);
665     env->setEpIdx(ep);
666     env->setGroupNum(g);
667     env->setSrcPe(CkMyPe());
668     env->setUsed(0);
669     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
670
671     CkPackMessage(&env);
672     CmiSetHandler(env, _charmHandlerIdx);
673     
674     //Provide a dummy dest proc as it does not matter for mulitcast 
675     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
676     
677     cmsg->npes = 0;
678     cmsg->pelist = NULL;
679
680     multicast(cmsg);
681 }
682
683 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
684
685     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
686     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
687
688     env->setUsed(0);    
689     CkPackMessage(&env);
690
691     //Will be used to detect multicast message for learning
692     
693     if (setupComplete)
694         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
695     else {
696         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
697         (* strategyTable)[curStratID].tmplist.enq(cmsg);
698     }
699
700     ComlibPrintf("After multicast\n");
701 }
702
703 //Collect statistics from all the processors, also gets the list of
704 //array elements on each processor.
705 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
706                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
707     
708     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
709
710     numStatsReceived ++;
711     clib_gstats.updateStats(stat, pe);
712     
713     for(int count = 0; count < idx_vec.length(); count++) {
714         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
715         
716         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
717                      pe);
718         
719         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
720     }        
721
722     //Reset cached array element index. Location table may have changed
723     CkpvAccess(cache_index).nInts = -1;
724     CkpvAccess(cache_aid).setZero();
725
726     if(numStatsReceived == CkNumPes()) {
727         numStatsReceived = 0;
728
729         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
730             count++ ){
731             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
732             if(strat->getType() > CONVERSE_STRATEGY) {
733                 CharmStrategy *cstrat = (CharmStrategy *)strat;
734                 ComlibLearner *learner = cstrat->getLearner();
735                 CharmStrategy *newstrat = NULL;
736                                 
737                 if(learner != NULL) {
738                     ComlibPrintf("Calling Learner\n");
739                     newstrat = (CharmStrategy *)learner->optimizePattern
740                         (strat, clib_gstats);
741                     
742                     if(newstrat != NULL)
743                         ListOfStrategies.enq(newstrat);
744                     else
745                         ListOfStrategies.enq(cstrat);
746                 }
747             }
748         }
749         barrierReached = 1;
750         
751         //if(lbUpdateReceived) {
752         //lbUpdateReceived = CmiFalse;
753         broadcastStrategies();
754         //}
755     }
756 }
757
758 void ComlibManager::setRemote(int remote_pe){
759
760     ComlibPrintf("Setting remote flag on\n");
761
762     remotePe = remote_pe;
763     isRemote = 1;
764 }
765
766
767 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
768                                       int strat_id) {
769     setInstance(strat_id);
770     
771     int nmsgs = rq.length();
772
773     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
774
775     for(int count = 0; count < nmsgs; count++) {
776         char *msg = rq.deq()->getCharmMessage();
777         envelope *env = UsrToEnv(msg);
778         
779         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
780                   env->getsetArrayMgr());
781     }
782
783     endIteration();
784 }
785
786 void ComlibManager::sendRemote(){
787     
788     int nmsgs = remoteQ.length();
789
790     //if(nmsgs == 0)
791     //  return;
792
793     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
794
795     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
796     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
797     
798     for(int count = 0; count < nmsgs; count++) {
799         CharmMessageHolder *cmsg = remoteQ.deq();
800         CkFreeMsg(cmsg->getCharmMessage());
801         delete cmsg;
802     }
803 }
804
805
806 void ComlibManager::AtSync() {
807
808     //comm_debug = 1;
809     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
810
811     barrier2Reached = 0;
812     receivedTable = 0;
813     setupComplete = 0;
814     barrierReached = 0;
815     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
816
817     int pos = 0;
818
819     CkVec<ClibGlobalArrayIndex> gidx_vec;
820
821     CkVec<CkArrayID> tmp_vec;
822     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
823         count ++) {
824         if((* strategyTable)[count].strategy->getType() 
825            == ARRAY_STRATEGY) {
826             CharmStrategy *cstrat = (CharmStrategy*)
827                 ((* strategyTable)[count].strategy);
828             
829             CkArrayID src, dest;
830             CkArrayIndexMax *elements;
831             int nelem;
832             
833             cstrat->ainfo.getSourceArray(src, elements, nelem);
834             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
835
836             CmiBool srcflag = CmiFalse;
837             CmiBool destflag = CmiFalse;
838             
839             if(src == dest || dest.isZero())
840                 destflag = CmiTrue;
841
842             if(src.isZero())
843                 srcflag = CmiTrue;                        
844
845             for(pos = 0; pos < tmp_vec.size(); pos++) {
846                 if(tmp_vec[pos] == src)
847                     srcflag = CmiTrue;
848
849                 if(tmp_vec[pos] == dest)
850                     destflag = CmiTrue;
851
852                 if(srcflag && destflag)
853                     break;
854             }
855
856             if(!srcflag)
857                 tmp_vec.insertAtEnd(src);
858
859             if(!destflag)
860                 tmp_vec.insertAtEnd(dest);
861         }
862         
863         //cant do it here, done in receiveTable
864         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
865         //  (* strategyTable)[count].reset();
866     }
867
868     for(pos = 0; pos < tmp_vec.size(); pos++) {
869         CkArrayID aid = tmp_vec[pos];
870
871         ComlibArrayListener *calistener = 
872             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
873
874         CkVec<CkArrayIndexMax> idx_vec;
875         calistener->getLocalIndices(idx_vec);
876
877         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
878             ClibGlobalArrayIndex gindex;
879             gindex.aid = aid;
880             gindex.idx = idx_vec[idx_count];
881
882             gidx_vec.insertAtEnd(gindex);
883         }
884     }
885
886     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
887     clib_stats.reset();
888 }
889
890 #include "lbdb.h"
891 #include "CentralLB.h"
892
893 /******** FOO BAR : NEEDS to be consistent with array manager *******/
894 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
895     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
896         CkAbort("LDB OBJ ID smaller than array index\n");
897     
898     //values higher than CkArrayIndexMax should be 0
899     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
900         idx.data()[count] = ld_id.id[count];
901     }
902     idx.nInts = 1;
903 }
904
905 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
906     for(int count = 0; count < msg->n_moves; count ++) {
907         MigrateInfo m = msg->moves[count];
908
909         CkArrayID aid; CkArrayIndexMax idx;
910         aid = CkArrayID(m.obj.omhandle.id.id);
911         LDObjID2IdxMax(m.obj.id, idx);
912
913         ClibGlobalArrayIndex cid; 
914         cid.aid = aid;
915         cid.idx = idx;
916         
917         int pe = CkpvAccess(locationTable)->get(cid);
918
919         //Value exists in the table, so update it
920         if(pe != 0) {
921             pe = m.to_pe + CkNumPes();
922             CkpvAccess(locationTable)->getRef(cid) = pe;
923         }
924         //otherwise we dont care about these objects
925     }   
926
927     lbUpdateReceived = CmiTrue;
928     if(barrierReached) {
929         broadcastStrategies();
930         barrierReached = 0;
931     }
932
933     CkFreeMsg(msg);
934 }
935
936 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
937     ComlibInstanceHandle *inst = new ComlibInstanceHandle
938         (*((ComlibInstanceHandle *)data));
939     return inst;
940 }
941
942
943 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
944                                                    CkDelegateData *pd) {
945
946     if(pd == NULL)
947         return NULL;
948
949     CmiBool to_pup = CmiFalse;
950
951     ComlibInstanceHandle *inst; 
952     if(!p.isUnpacking()) {
953         inst = (ComlibInstanceHandle *) pd;       
954         if(pd != NULL)
955             to_pup = CmiTrue;
956     }
957     else 
958         //Call migrate constructor
959         inst = new ComlibInstanceHandle();
960
961     p | to_pup;
962
963     if(to_pup)
964         inst->pup(p);
965     return inst;
966 }    
967
968 void ComlibDelegateProxy(CProxy *proxy){
969     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
970     proxy->ckDelegate(cgproxy.ckLocalBranch(), NULL);
971 }
972
973 void ComlibAssociateProxy(ComlibInstanceHandle *cinst, CProxy &proxy) {
974     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
975     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
976 }
977
978 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
979     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
980         (CkGetComlibInstance());
981
982     cinst->setStrategy(strat);
983     
984     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
985     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
986
987
988 ComlibInstanceHandle ComlibRegister(CharmStrategy *strat) {
989     ComlibInstanceHandle cinst(CkGetComlibInstance());
990     cinst.setStrategy(strat);
991     return cinst;
992 }
993
994 void ComlibBegin(CProxy &proxy) {
995     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
996     cinst->beginIteration();
997
998
999 void ComlibEnd(CProxy &proxy) {
1000     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
1001     cinst->endIteration();
1002 }
1003
1004 ComlibInstanceHandle CkCreateComlibInstance(){
1005     return CkGetComlibInstance();
1006 }
1007
1008 ComlibInstanceHandle CkGetComlibInstance() {
1009     if(CkMyPe() != 0)
1010         CkAbort("Comlib Instance can only be created on Processor 0");
1011     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
1012     return (cgproxy.ckLocalBranch())->createInstance();
1013 }
1014
1015 ComlibInstanceHandle CkGetComlibInstance(int id) {
1016     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
1017     return cinst;
1018 }
1019
1020 void ComlibDoneCreating(){
1021     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1022     (cgproxy.ckLocalBranch())->broadcastStrategies();
1023 }
1024
1025 char *router;
1026 int sfactor=0;
1027
1028 class ComlibManagerMain {
1029 public:
1030     ComlibManagerMain(CkArgMsg *msg) {
1031         
1032         if(CkMyPe() == 0 && msg !=  NULL)
1033             CmiGetArgString(msg->argv, "+strategy", &router);         
1034
1035         if(CkMyPe() == 0 && msg !=  NULL)
1036             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1037         
1038         CProxy_ComlibManager::ckNew();
1039     }
1040 };
1041
1042 //Called by user code
1043 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1044     _instid = -1;
1045     _dmid.setZero();
1046     _srcPe = -1;
1047     toForward = 0;
1048 }
1049
1050 //Called by user code
1051 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1052     : CkDelegateData() {
1053     _instid = h._instid;
1054     _dmid = h._dmid;
1055     toForward = h.toForward;        
1056
1057     ComlibPrintf("In Copy Constructor\n");
1058     _srcPe = h._srcPe;
1059
1060     reset();
1061     ref();
1062 }
1063
1064 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1065     _instid = h._instid;
1066     _dmid = h._dmid;
1067     toForward = h.toForward;
1068     _srcPe = h._srcPe;
1069     
1070     reset();
1071     ref();
1072     return *this;
1073 }
1074
1075 //Called by the communication library
1076 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1077     _instid = instid;
1078     _dmid   = dmid;
1079     _srcPe  = -1;
1080     toForward = 0;
1081 }
1082
1083 void ComlibInstanceHandle::beginIteration() { 
1084     CProxy_ComlibManager cgproxy(_dmid);
1085
1086     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1087
1088     //User forgot to make the instance handle a readonly or pass it
1089     //into the constructor of an array and is using it directly from
1090     //Main :: main
1091     if(_srcPe == -1) {
1092         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1093         _srcPe = CkMyPe();
1094     }
1095
1096     if(_srcPe != CkMyPe() && toForward) {
1097         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1098     }
1099
1100     (cgproxy.ckLocalBranch())->setInstance(_instid);
1101     (cgproxy.ckLocalBranch())->beginIteration();   
1102 }
1103
1104 void ComlibInstanceHandle::endIteration() {
1105     CProxy_ComlibManager cgproxy(_dmid);
1106     (cgproxy.ckLocalBranch())->endIteration();
1107 }
1108
1109 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1110     toForward = s->getForwardOnMigration();
1111     CProxy_ComlibManager cgproxy(_dmid);
1112     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1113 }
1114
1115 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1116     if(_instid < 0) 
1117         return NULL;    
1118     
1119     CProxy_ComlibManager cgproxy(_dmid);
1120     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1121 }
1122
1123 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1124
1125 void ComlibInitSectionID(CkSectionID &sid){
1126     
1127     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1128     sid._cookie.pe = CkMyPe();
1129     
1130     sid._cookie.sInfo.cInfo.id = 0;    
1131     sid.npes = 0;
1132     sid.pelist = NULL;
1133 }
1134
1135 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1136     CkSectionID &sid = sproxy->ckGetSectionID();
1137     ComlibInitSectionID(sid);
1138     sid._cookie.sInfo.cInfo.status = 0;
1139 }
1140
1141 // for backward compatibility - for old name commlib
1142 void _registercommlib(void)
1143 {
1144   static int _done = 0; 
1145   if(_done) 
1146       return; 
1147   _done = 1;
1148   _registercomlib();
1149 }
1150
1151 void ComlibAtSyncHandler(void *msg) {
1152     CmiFree(msg);
1153     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1154     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1155     if(cmgr_ptr)
1156         cmgr_ptr->AtSync();    
1157 }
1158
1159 void ComlibNotifyMigrationDoneHandler(void *msg) {
1160     CmiFree(msg);
1161     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1162     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1163     if(cmgr_ptr)
1164         cmgr_ptr->AtSync();    
1165 }
1166
1167
1168 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1169     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1170     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1171 }
1172
1173 #include "comlib.def.h"
1174
1175