Code cleanup. Shorter header for streaming and enable array caching again.
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_com_ptr);
22
23 //Handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
27
28     if(msg == NULL)
29         return;
30     
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     int srcPe = env->getSrcPe();
37     int sid = ((CmiMsgHeaderExt *) env)->stratid;
38
39     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
40              sid, env->getTotalsize(), srcPe);
41
42     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
43     
44     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
45     //if(!comm_debug)
46     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
47
48     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
49     return;
50 }
51
52 void recv_combined_array_msg(void *msg){
53     if(msg == NULL)
54         return;
55     
56     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
57
58     MsgPacker::deliver((CombinedMessage *)msg);
59 }
60
61 ComlibManager::ComlibManager(){
62     init();
63     ComlibPrintf("In comlibmanager constructor\n");
64 }
65
66 void ComlibManager::init(){
67     
68     initComlibManager();
69
70     PUPable_reg(CharmStrategy);
71     PUPable_reg(CharmMessageHolder);
72     
73     //comm_debug = 1;
74     
75     numStatsReceived = 0;
76     curComlibController = 0;
77     clibIteration = 0;
78     
79     strategyCreated = CmiFalse;
80
81     CkpvInitialize(ClibLocationTableType*, locationTable);
82     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
83
84     CkpvInitialize(CkArrayIndexMax, cache_index);
85     CkpvInitialize(int, cache_pe);
86     CkpvInitialize(CkArrayID, cache_aid);
87
88     CkpvAccess(cache_index).nInts = -1;
89     CkpvAccess(cache_aid).setZero();
90
91     CkpvInitialize(int, RecvmsgHandle);
92     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
93
94     bcast_pelist = new int [CkNumPes()];
95     for(int brcount = 0; brcount < CkNumPes(); brcount++)
96         bcast_pelist[brcount] = brcount;
97
98     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
99     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
100         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
101     
102     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
103     
104     npes = CkNumPes();
105     pelist = NULL;
106
107     CkpvInitialize(CkGroupID, cmgrID);
108     CkpvAccess(cmgrID) = thisgroup;
109
110     dummyArrayIndex.nInts = 0;
111
112     curStratID = 0;
113     prevStratID = -1;
114     //prioEndIterationFlag = 1;
115
116     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
117     
118     receivedTable = 0;
119     setupComplete = 0;
120
121     barrierReached = 0;
122     barrier2Reached = 0;
123
124     bcount = b2count = 0;
125     lbUpdateReceived = CmiFalse;
126
127     isRemote = 0;
128     remotePe = -1;
129
130     CkpvInitialize(int, migrationDoneHandlerID);
131     CkpvAccess(migrationDoneHandlerID) = 
132         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
133     
134     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
135     cgproxy[curComlibController].barrier();
136 }
137
138 //First barrier makes sure that the communication library group 
139 //has been created on all processors
140 void ComlibManager::barrier(){
141     ComlibPrintf("In barrier %d\n", bcount);
142     if(CkMyPe() == 0) {
143         bcount ++;
144         if(bcount == CkNumPes()){
145             bcount = 0;
146             barrierReached = 1;
147             barrier2Reached = 0;
148
149             if(strategyCreated)
150                 broadcastStrategies();
151         }
152     }
153 }
154
155 //Has finished passing the strategy list to all the processors
156 void ComlibManager::barrier2(){
157     if(CkMyPe() == 0) {
158         b2count ++;
159         ComlibPrintf("In barrier2 %d\n", bcount);
160         if(b2count == CkNumPes()) {
161             b2count = 0; 
162             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
163             cgproxy.resumeFromBarrier2();
164         }
165     }
166 }
167
168 //Registers a set of strategies with the communication library
169 ComlibInstanceHandle ComlibManager::createInstance() {
170   
171     CkpvAccess(conv_com_ptr)->nstrats++;    
172     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
173                                CkpvAccess(cmgrID));  
174     return cinst;
175 }
176
177 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
178     
179     strategyCreated = true;
180
181     ListOfStrategies.enq(strat);
182     strat->setInstance(pos);
183 }
184
185 //End of registering function, if barriers have been reached send them over
186 void ComlibManager::broadcastStrategies() {
187     if(!barrierReached)
188       return;    
189
190     lbUpdateReceived = CmiFalse;
191     barrierReached = 0;
192
193     ComlibPrintf("Sending Strategies %d, %d\n", 
194                  CkpvAccess(conv_com_ptr)->nstrats, 
195                  ListOfStrategies.length());
196
197     StrategyWrapper sw;
198     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
199
200     if(ListOfStrategies.length() > 0) {
201         int len = ListOfStrategies.length();
202         sw.s_table = new Strategy* [len];
203         sw.nstrats = len;
204         
205         for (int count=0; count < len; count++)
206             sw.s_table[count] = ListOfStrategies.deq();
207     }
208     else {
209         sw.nstrats = 0;
210         sw.s_table = 0;
211     }
212
213     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
214     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
215 }
216
217 //Called when the array/group element starts sending messages
218 void ComlibManager::beginIteration(){
219     //right now does not do anything might need later
220     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
221     //prioEndIterationFlag = 0;
222 }
223
224 void ComlibManager::setInstance(int instID){
225
226     curStratID = instID;
227     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
228 }
229
230 //called when the array elements has finished sending messages
231 void ComlibManager::endIteration(){
232     //    prioEndIterationFlag = 1;
233     prevStratID = -1;
234     
235     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
236                  curStratID, 
237                  (* strategyTable)[curStratID].elementCount, 
238                  (* strategyTable)[curStratID].numElements);
239
240     if(isRemote) {
241         isRemote = 0;
242         sendRemote();
243         return;
244     }
245
246     if(!receivedTable) {
247         (* strategyTable)[curStratID].nEndItr++;
248         return;
249     }        
250     
251     (* strategyTable)[curStratID].elementCount++;
252     int count = 0;
253     
254     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
255         
256         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
257         
258         if(barrier2Reached) {       
259             (* strategyTable)[curStratID].strategy->doneInserting();
260         }
261         else (* strategyTable)[curStratID].call_doneInserting = 1;
262         
263         (* strategyTable)[curStratID].elementCount = 0;
264     }
265     ComlibPrintf("After EndIteration\n");
266 }
267
268 //receive the list of strategies
269 //Insert the strategies into the strategy table in converse comm lib.
270 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
271 void ComlibManager::receiveTable(StrategyWrapper &sw, 
272                                  CkHashtableT<ClibGlobalArrayIndex, int> 
273                                  &htable)
274 {
275
276     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
277                  clibIteration);
278
279     receivedTable = 1;
280
281     delete CkpvAccess(locationTable);
282     CkpvAccess(locationTable) =  NULL;
283
284     CkpvAccess(locationTable) = new 
285         CkHashtableT<ClibGlobalArrayIndex, int>;
286
287     CkHashtableIterator *ht_iterator = htable.iterator();
288     ht_iterator->seekStart();
289     while(ht_iterator->hasNext()){
290         ClibGlobalArrayIndex *idx;
291         int *pe;
292         pe = (int *)ht_iterator->next((void **)&idx);
293         
294         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
295                      idx->idx.data()[0], *pe);
296
297         CkpvAccess(locationTable)->put(*idx) = *pe;       
298     }
299     
300     //Reset cached array element index. Location table may have changed
301     CkpvAccess(cache_index).nInts = -1;
302     CkpvAccess(cache_aid).setZero();
303
304     CkArrayID st_aid;
305     int st_nelements;
306     CkArrayIndexMax *st_elem;
307     int temp_curstratid = curStratID;
308
309     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
310     clib_stats.setNstrats(sw.total_nstrats);
311
312     //First recreate strategies
313     int count = 0;
314     for(count = 0; count < sw.nstrats; count ++) {
315         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
316         
317         //set the instance to the current count
318         //currently all strategies are being copied to all processors
319         //later strategies will be selectively copied
320         
321         //location of this strategy table entry in the strategy table
322         int loc = cur_strategy->getInstance();
323         
324         if(loc >= MAX_NUM_STRATS)
325             CkAbort("Strategy table is full \n");
326
327         CharmStrategy *old_strategy;
328
329         //If this is a learning decision and the old strategy has to
330         //be gotten rid of, finalize it here.
331         if((old_strategy = 
332             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
333            != NULL) {
334             old_strategy->finalizeProcessing();
335             
336             //Unregister from array listener if array strategy
337             if(old_strategy->getType() == ARRAY_STRATEGY) {
338                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
339                 as.getSourceArray(st_aid, st_elem, st_nelements);
340
341                 (* strategyTable)[loc].numElements = 0;
342                 if(!st_aid.isZero()) {
343                     ComlibArrayListener *calistener = CkArrayID::
344                         CkLocalBranch(st_aid)->getComlibArrayListener();
345                     
346                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
347                 }
348             }
349         }
350         
351         //Insert strategy, frees an old strategy and sets the
352         //strategy_table entry to point to the new one
353         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
354         
355         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
356
357         if(cur_strategy->getType() == ARRAY_STRATEGY &&
358            cur_strategy->isBracketed()){ 
359
360             ComlibPrintf("Inserting Array Listener\n");
361             
362             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
363             as.getSourceArray(st_aid, st_elem, st_nelements);
364             
365             (* strategyTable)[loc].numElements = 0;
366             if(!st_aid.isZero()) {            
367                 ComlibArrayListener *calistener = 
368                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
369                 
370                 calistener->registerStrategy(&((* strategyTable)[loc]));
371             }
372         }                      
373         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
374           (* strategyTable)[loc].numElements = 1;
375         }
376         
377         (* strategyTable)[loc].elementCount = 0;
378         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
379     }
380
381     //Resume all end iterarions. Newer strategies may have more 
382     //or fewer elements to expect for!!
383     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
384         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
385                      CkMyPe(), count,
386                      (* strategyTable)[count].nEndItr);
387                          
388         curStratID = count;
389         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
390             endIteration();  
391         
392         (* strategyTable)[count].nEndItr = 0;        
393     }           
394     
395     curStratID = temp_curstratid;
396     ComlibPrintf("receivedTable %d\n", sw.nstrats);
397     
398     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
399     cgproxy[curComlibController].barrier2();
400 }
401
402 void ComlibManager::resumeFromBarrier2(){
403
404     setupComplete = 1;
405
406     barrier2Reached = 1;
407     barrierReached = 0;
408
409     clibIteration ++;
410
411     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
412
413     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
414          count ++) {
415         if (!(* strategyTable)[count].tmplist.isEmpty()) {
416             
417             while (!(* strategyTable)[count].tmplist.isEmpty()) {
418                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
419                     (* strategyTable)[count].tmplist.deq();
420                 
421                 if((*strategyTable)[count].strategy->getType() == 
422                    ARRAY_STRATEGY) {
423                     if(cmsg->dest_proc >= 0) {
424                         envelope *env  = UsrToEnv(cmsg->getCharmMessage());
425                         cmsg->dest_proc = getLastKnown
426                             (env->getsetArrayMgr(),
427                              env->getsetArrayIndex());
428                     }
429                     //else multicast
430                 }                                
431                 
432                 if(cmsg->dest_proc == CkMyPe()) {
433                     CmiSyncSendAndFree(CkMyPe(), cmsg->size,
434                                        (char *)
435                                        UsrToEnv(cmsg->getCharmMessage()));
436                     delete cmsg;
437                 }
438                 else
439                     (* strategyTable)[count].strategy->insertMessage(cmsg);
440             }
441         }
442         
443         if ((* strategyTable)[count].call_doneInserting) {
444             (* strategyTable)[count].call_doneInserting = 0;
445             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
446             (* strategyTable)[count].strategy->doneInserting();
447         }
448     }
449     
450     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
451 }
452
453 extern int _charmHandlerIdx;
454
455 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
456                               const CkArrayIndexMax &idx, CkArrayID a){
457     
458     if(pd != NULL) {
459         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
460         setInstance(ci->_instid);
461     }
462     
463     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
464     
465     CkArrayIndexMax myidx = idx;
466     int dest_proc = getLastKnown(a, myidx); 
467     
468     //Reading from two hash tables is a big overhead
469     //int amgr_destpe = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
470     
471     register envelope * env = UsrToEnv(msg);
472     
473     env->getsetArrayMgr()=a;
474     env->getsetArraySrcPe()=CkMyPe();
475     env->getsetArrayEp()=ep;
476     env->getsetArrayHops()=0;
477     env->getsetArrayIndex()=idx;
478     env->setUsed(0);
479     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
480
481     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
482
483     if(isRemote) {
484         CkPackMessage(&env);        
485         CharmMessageHolder *cmsg = new 
486             CharmMessageHolder((char *)msg, dest_proc);
487         
488         remoteQ.enq(cmsg);
489         return;
490     }
491     
492     //With migration some array messages may be directly sent Also no
493     //message processing should happen before the comlib barriers have
494     //gone through
495     if(dest_proc == CkMyPe() && setupComplete){                
496         CkArray *amgr = (CkArray *)_localBranch(a);
497         amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
498         
499         return;
500     }
501
502     CkPackMessage(&env);
503     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
504     
505     //get rid of the new.
506     CharmMessageHolder *cmsg = new 
507         CharmMessageHolder((char *)msg, dest_proc);
508     
509     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, setupComplete);
510     
511     if (setupComplete)        
512         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
513     else 
514         (* strategyTable)[curStratID].tmplist.enq(cmsg);
515     
516     //CmiPrintf("After Insert\n");
517 }
518
519
520 #include "qd.h"
521 //CkpvExtern(QdState*, _qd);
522
523 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
524     
525
526     if(pd != NULL) {
527         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
528         setInstance(ci->_instid);
529     }
530
531     int dest_proc = onPE;
532
533     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
534                  UsrToEnv(msg)->getTotalsize());
535
536     register envelope * env = UsrToEnv(msg);
537     if(dest_proc == CkMyPe() && setupComplete){
538         _SET_USED(env, 0);
539         CkSendMsgBranch(ep, msg, dest_proc, gid);
540         return;
541     }
542     
543     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
544     CkpvAccess(_qd)->create(1);
545
546     env->setMsgtype(ForBocMsg);
547     env->setEpIdx(ep);
548     env->setGroupNum(gid);
549     env->setSrcPe(CkMyPe());
550     env->setUsed(0);
551
552     CkPackMessage(&env);
553     CmiSetHandler(env, _charmHandlerIdx);
554
555     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
556     //get rid of the new.
557     
558     if(setupComplete)
559         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
560     else {
561         (* strategyTable)[curStratID].tmplist.enq(cmsg);
562     }
563 }
564
565 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
566     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
567
568     if(pd != NULL) {
569         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
570         setInstance(ci->_instid);
571     }
572     
573     //Broken, add the processor list here.
574
575     register envelope * env = UsrToEnv(m);
576     env->getsetArrayMgr()=a;
577     env->getsetArraySrcPe()=CkMyPe();
578     env->getsetArrayEp()=ep;
579     env->getsetArrayHops()=0;
580     env->getsetArrayIndex()= dummyArrayIndex;
581     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
582
583     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
584     
585     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
586
587     CharmMessageHolder *cmsg = new 
588         CharmMessageHolder((char *)m, IS_BROADCAST);
589     cmsg->npes = 0;
590     cmsg->pelist = NULL;
591     cmsg->sec_id = NULL;
592
593     multicast(cmsg);
594 }
595
596 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
597                                      CkArrayID a, CkSectionID &s, int opts) {
598
599 #ifndef CMK_OPTIMIZE
600     traceUserEvent(section_send_event);
601 #endif
602
603     if(pd != NULL) {
604         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
605         setInstance(ci->_instid);
606     }
607     
608     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
609
610     register envelope * env = UsrToEnv(m);
611     env->getsetArrayMgr()=a;
612     env->getsetArraySrcPe()=CkMyPe();
613     env->getsetArrayEp()=ep;
614     env->getsetArrayHops()=0;
615     env->getsetArrayIndex()= dummyArrayIndex;
616     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
617
618     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
619     
620     env->setUsed(0);    
621     CkPackMessage(&env);
622     
623     //Provide a dummy dest proc as it does not matter for mulitcast 
624     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
625                                                       IS_SECTION_MULTICAST);
626     cmsg->npes = 0;
627     cmsg->sec_id = &s;
628
629     CkSectionInfo minfo;
630     minfo.type = COMLIB_MULTICAST_MESSAGE;
631     minfo.sInfo.cInfo.instId = curStratID;
632     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
633     minfo.sInfo.cInfo.id = 0; 
634     minfo.pe = CkMyPe();
635     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
636     //    s.npes = 0;
637     //s.pelist = NULL;
638
639     multicast(cmsg);
640 }
641
642 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
643     
644     if(pd != NULL) {
645         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
646         setInstance(ci->_instid);
647     }
648     
649     register envelope * env = UsrToEnv(m);
650     
651     CkpvAccess(_qd)->create(1);
652
653     env->setMsgtype(ForBocMsg);
654     env->setEpIdx(ep);
655     env->setGroupNum(g);
656     env->setSrcPe(CkMyPe());
657     env->setUsed(0);
658     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
659
660     CkPackMessage(&env);
661     CmiSetHandler(env, _charmHandlerIdx);
662     
663     //Provide a dummy dest proc as it does not matter for mulitcast 
664     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
665     
666     cmsg->npes = 0;
667     cmsg->pelist = NULL;
668
669     multicast(cmsg);
670 }
671
672 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
673
674     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
675     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
676
677     env->setUsed(0);    
678     CkPackMessage(&env);
679
680     //Will be used to detect multicast message for learning
681     
682     if (setupComplete)
683         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
684     else {
685         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
686         (* strategyTable)[curStratID].tmplist.enq(cmsg);
687     }
688
689     ComlibPrintf("After multicast\n");
690 }
691
692 //Collect statistics from all the processors, also gets the list of
693 //array elements on each processor.
694 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
695                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
696     
697     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
698
699     numStatsReceived ++;
700     clib_gstats.updateStats(stat, pe);
701     
702     for(int count = 0; count < idx_vec.length(); count++) {
703         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
704         
705         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
706                      pe);
707         
708         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
709     }        
710
711     //Reset cached array element index. Location table may have changed
712     CkpvAccess(cache_index).nInts = -1;
713     CkpvAccess(cache_aid).setZero();
714
715     if(numStatsReceived == CkNumPes()) {
716         numStatsReceived = 0;
717
718         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
719             count++ ){
720             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
721             if(strat->getType() > CONVERSE_STRATEGY) {
722                 CharmStrategy *cstrat = (CharmStrategy *)strat;
723                 ComlibLearner *learner = cstrat->getLearner();
724                 CharmStrategy *newstrat = NULL;
725                                 
726                 if(learner != NULL) {
727                     ComlibPrintf("Calling Learner\n");
728                     newstrat = (CharmStrategy *)learner->optimizePattern
729                         (strat, clib_gstats);
730                     
731                     if(newstrat != NULL)
732                         ListOfStrategies.enq(newstrat);
733                     else
734                         ListOfStrategies.enq(cstrat);
735                 }
736             }
737         }
738         barrierReached = 1;
739         
740         //if(lbUpdateReceived) {
741         //lbUpdateReceived = CmiFalse;
742         broadcastStrategies();
743         //}
744     }
745 }
746
747 void ComlibManager::setRemote(int remote_pe){
748
749     ComlibPrintf("Setting remote flag on\n");
750
751     remotePe = remote_pe;
752     isRemote = 1;
753 }
754
755
756 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
757                                       int strat_id) {
758     setInstance(strat_id);
759     
760     int nmsgs = rq.length();
761
762     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
763
764     for(int count = 0; count < nmsgs; count++) {
765         char *msg = rq.deq()->getCharmMessage();
766         envelope *env = UsrToEnv(msg);
767         
768         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
769                   env->getsetArrayMgr());
770     }
771
772     endIteration();
773 }
774
775 void ComlibManager::sendRemote(){
776     
777     int nmsgs = remoteQ.length();
778
779     //if(nmsgs == 0)
780     //  return;
781
782     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
783
784     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
785     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
786     
787     for(int count = 0; count < nmsgs; count++) {
788         CharmMessageHolder *cmsg = remoteQ.deq();
789         CkFreeMsg(cmsg->getCharmMessage());
790         delete cmsg;
791     }
792 }
793
794
795 void ComlibManager::AtSync() {
796
797     //comm_debug = 1;
798     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
799
800     barrier2Reached = 0;
801     receivedTable = 0;
802     setupComplete = 0;
803     barrierReached = 0;
804     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
805
806     int pos = 0;
807
808     CkVec<ClibGlobalArrayIndex> gidx_vec;
809
810     CkVec<CkArrayID> tmp_vec;
811     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
812         count ++) {
813         if((* strategyTable)[count].strategy->getType() 
814            == ARRAY_STRATEGY) {
815             CharmStrategy *cstrat = (CharmStrategy*)
816                 ((* strategyTable)[count].strategy);
817             
818             CkArrayID src, dest;
819             CkArrayIndexMax *elements;
820             int nelem;
821             
822             cstrat->ainfo.getSourceArray(src, elements, nelem);
823             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
824
825             CmiBool srcflag = CmiFalse;
826             CmiBool destflag = CmiFalse;
827             
828             if(src == dest || dest.isZero())
829                 destflag = CmiTrue;
830
831             if(src.isZero())
832                 srcflag = CmiTrue;                        
833
834             for(pos = 0; pos < tmp_vec.size(); pos++) {
835                 if(tmp_vec[pos] == src)
836                     srcflag = CmiTrue;
837
838                 if(tmp_vec[pos] == dest)
839                     destflag = CmiTrue;
840
841                 if(srcflag && destflag)
842                     break;
843             }
844
845             if(!srcflag)
846                 tmp_vec.insertAtEnd(src);
847
848             if(!destflag)
849                 tmp_vec.insertAtEnd(dest);
850         }
851         
852         //cant do it here, done in receiveTable
853         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
854         //  (* strategyTable)[count].reset();
855     }
856
857     for(pos = 0; pos < tmp_vec.size(); pos++) {
858         CkArrayID aid = tmp_vec[pos];
859
860         ComlibArrayListener *calistener = 
861             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
862
863         CkVec<CkArrayIndexMax> idx_vec;
864         calistener->getLocalIndices(idx_vec);
865
866         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
867             ClibGlobalArrayIndex gindex;
868             gindex.aid = aid;
869             gindex.idx = idx_vec[idx_count];
870
871             gidx_vec.insertAtEnd(gindex);
872         }
873     }
874
875     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
876     clib_stats.reset();
877 }
878
879 #include "lbdb.h"
880 #include "CentralLB.h"
881
882 /******** FOO BAR : NEEDS to be consistent with array manager *******/
883 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
884     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
885         CkAbort("LDB OBJ ID smaller than array index\n");
886     
887     //values higher than CkArrayIndexMax should be 0
888     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
889         idx.data()[count] = ld_id.id[count];
890     }
891     idx.nInts = 1;
892 }
893
894 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
895     for(int count = 0; count < msg->n_moves; count ++) {
896         MigrateInfo m = msg->moves[count];
897
898         CkArrayID aid; CkArrayIndexMax idx;
899         aid = CkArrayID(m.obj.omhandle.id.id);
900         LDObjID2IdxMax(m.obj.id, idx);
901
902         ClibGlobalArrayIndex cid; 
903         cid.aid = aid;
904         cid.idx = idx;
905         
906         int pe = CkpvAccess(locationTable)->get(cid);
907
908         //Value exists in the table, so update it
909         if(pe != 0) {
910             pe = m.to_pe + CkNumPes();
911             CkpvAccess(locationTable)->getRef(cid) = pe;
912         }
913         //otherwise we dont care about these objects
914     }   
915
916     lbUpdateReceived = CmiTrue;
917     if(barrierReached) {
918         broadcastStrategies();
919         barrierReached = 0;
920     }
921
922     CkFreeMsg(msg);
923 }
924
925 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
926     ComlibInstanceHandle *inst = new ComlibInstanceHandle
927         (*((ComlibInstanceHandle *)data));
928     return inst;
929 }
930
931
932 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
933                                                    CkDelegateData *pd) {
934
935     if(pd == NULL)
936         return NULL;
937
938     CmiBool to_pup = CmiFalse;
939
940     ComlibInstanceHandle *inst; 
941     if(!p.isUnpacking()) {
942         inst = (ComlibInstanceHandle *) pd;       
943         if(pd != NULL)
944             to_pup = CmiTrue;
945     }
946     else 
947         //Call migrate constructor
948         inst = new ComlibInstanceHandle();
949
950     p | to_pup;
951
952     if(to_pup)
953         inst->pup(p);
954     return inst;
955 }    
956
957
958 void ComlibDelegateProxy(CProxy *proxy){
959     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
960     proxy->ckDelegate(cgproxy.ckLocalBranch(), NULL);
961 }
962
963 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
964     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
965         (CkGetComlibInstance());
966
967     cinst->setStrategy(strat);
968     
969     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
970     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
971
972
973 void ComlibBegin(CProxy &proxy) {
974     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
975     cinst->beginIteration();
976
977
978 void ComlibEnd(CProxy &proxy) {
979     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
980     cinst->endIteration();
981 }
982
983 ComlibInstanceHandle CkCreateComlibInstance(){
984     return CkGetComlibInstance();
985 }
986
987 ComlibInstanceHandle CkGetComlibInstance() {
988     if(CkMyPe() != 0)
989         CkAbort("Comlib Instance can only be created on Processor 0");
990     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
991     return (cgproxy.ckLocalBranch())->createInstance();
992 }
993
994 ComlibInstanceHandle CkGetComlibInstance(int id) {
995     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
996     return cinst;
997 }
998
999 void ComlibDoneCreating(){
1000     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1001     (cgproxy.ckLocalBranch())->broadcastStrategies();
1002 }
1003
1004 char *router;
1005 int sfactor=0;
1006
1007 class ComlibManagerMain {
1008 public:
1009     ComlibManagerMain(CkArgMsg *msg) {
1010         
1011         if(CkMyPe() == 0 && msg !=  NULL)
1012             CmiGetArgString(msg->argv, "+strategy", &router);         
1013
1014         if(CkMyPe() == 0 && msg !=  NULL)
1015             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1016         
1017         CProxy_ComlibManager::ckNew();
1018     }
1019 };
1020
1021 //Called by user code
1022 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1023     _instid = -1;
1024     _dmid.setZero();
1025     _srcPe = -1;
1026     toForward = 0;
1027 }
1028
1029 //Called by user code
1030 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1031     : CkDelegateData() {
1032     _instid = h._instid;
1033     _dmid = h._dmid;
1034     toForward = h.toForward;        
1035
1036     ComlibPrintf("In Copy Constructor\n");
1037     _srcPe = h._srcPe;
1038
1039     reset();
1040     ref();
1041 }
1042
1043 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1044     _instid = h._instid;
1045     _dmid = h._dmid;
1046     toForward = h.toForward;
1047     _srcPe = h._srcPe;
1048     
1049     reset();
1050     ref();
1051     return *this;
1052 }
1053
1054 //Called by the communication library
1055 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1056     _instid = instid;
1057     _dmid   = dmid;
1058     _srcPe  = -1;
1059     toForward = 0;
1060 }
1061
1062 void ComlibInstanceHandle::beginIteration() { 
1063     CProxy_ComlibManager cgproxy(_dmid);
1064
1065     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1066
1067     //User forgot to make the instance handle a readonly or pass it
1068     //into the constructor of an array and is using it directly from
1069     //Main :: main
1070     if(_srcPe == -1) {
1071         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1072         _srcPe = CkMyPe();
1073     }
1074
1075     if(_srcPe != CkMyPe() && toForward) {
1076         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1077     }
1078
1079     (cgproxy.ckLocalBranch())->setInstance(_instid);
1080     (cgproxy.ckLocalBranch())->beginIteration();   
1081 }
1082
1083 void ComlibInstanceHandle::endIteration() {
1084     CProxy_ComlibManager cgproxy(_dmid);
1085     (cgproxy.ckLocalBranch())->endIteration();
1086 }
1087
1088 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1089     toForward = s->getForwardOnMigration();
1090     CProxy_ComlibManager cgproxy(_dmid);
1091     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1092 }
1093
1094 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1095     if(_instid < 0) 
1096         return NULL;    
1097     
1098     CProxy_ComlibManager cgproxy(_dmid);
1099     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1100 }
1101
1102 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1103
1104 void ComlibInitSectionID(CkSectionID &sid){
1105     
1106     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1107     sid._cookie.pe = CkMyPe();
1108     
1109     sid._cookie.sInfo.cInfo.id = 0;    
1110     sid.npes = 0;
1111     sid.pelist = NULL;
1112 }
1113
1114 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1115     CkSectionID &sid = sproxy->ckGetSectionID();
1116     ComlibInitSectionID(sid);
1117     sid._cookie.sInfo.cInfo.status = 0;
1118 }
1119
1120 // for backward compatibility - for old name commlib
1121 void _registercommlib(void)
1122 {
1123   static int _done = 0; 
1124   if(_done) 
1125       return; 
1126   _done = 1;
1127   _registercomlib();
1128 }
1129
1130 void ComlibAtSyncHandler(void *msg) {
1131     CmiFree(msg);
1132     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1133     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1134     if(cmgr_ptr)
1135         cmgr_ptr->AtSync();    
1136 }
1137
1138 void ComlibNotifyMigrationDoneHandler(void *msg) {
1139     CmiFree(msg);
1140     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1141     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1142     if(cmgr_ptr)
1143         cmgr_ptr->AtSync();    
1144 }
1145
1146
1147 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1148     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1149     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1150 }
1151
1152 #include "comlib.def.h"
1153
1154