3ee55f9d4ad39ad320642db8079760177bf19c04
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_com_ptr);
22
23 //handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
27
28     if(msg == NULL)
29         return;
30     
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     int srcPe = env->getSrcPe();
37     int sid = ((CmiMsgHeaderExt *) env)->stratid;
38
39     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
40              sid, env->getTotalsize(), srcPe);
41
42     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
43     
44     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
45     //if(!comm_debug)
46     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
47
48     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
49     return;
50 }
51
52 void recv_combined_array_msg(void *msg){
53     if(msg == NULL)
54         return;
55     
56     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
57
58     MsgPacker::deliver((CombinedMessage *)msg);
59 }
60
61 ComlibManager::ComlibManager(){
62     init();
63     ComlibPrintf("In comlibmanager constructor\n");
64 }
65
66 void ComlibManager::init(){
67     
68     initComlibManager();
69
70     PUPable_reg(CharmStrategy);
71     PUPable_reg(CharmMessageHolder);
72     
73     //comm_debug = 1;
74     
75     numStatsReceived = 0;
76     curComlibController = 0;
77     clibIteration = 0;
78     
79     strategyCreated = CmiFalse;
80
81     CkpvInitialize(ClibLocationTableType*, locationTable);
82     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
83
84     CkpvInitialize(int, RecvmsgHandle);
85     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
86
87     bcast_pelist = new int [CkNumPes()];
88     for(int brcount = 0; brcount < CkNumPes(); brcount++)
89         bcast_pelist[brcount] = brcount;
90
91     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
92     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
93         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
94     
95     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
96     
97     npes = CkNumPes();
98     pelist = NULL;
99
100     CkpvInitialize(CkGroupID, cmgrID);
101     CkpvAccess(cmgrID) = thisgroup;
102
103     dummyArrayIndex.nInts = 0;
104
105     curStratID = 0;
106     prevStratID = -1;
107     //prioEndIterationFlag = 1;
108
109     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
110     
111     receivedTable = 0;
112     flushTable = 0;
113     //    totalMsgCount = 0;
114     //    totalBytes = 0;
115     //nIterations = 0;
116     barrierReached = 0;
117     barrier2Reached = 0;
118
119     bcount = b2count = 0;
120     lbUpdateReceived = CmiFalse;
121
122     isRemote = 0;
123     remotePe = -1;
124
125     CkpvInitialize(int, migrationDoneHandlerID);
126     CkpvAccess(migrationDoneHandlerID) = 
127         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
128     
129     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
130     cgproxy[curComlibController].barrier();
131 }
132
133 //First barrier makes sure that the communication library group 
134 //has been created on all processors
135 void ComlibManager::barrier(){
136     ComlibPrintf("In barrier %d\n", bcount);
137     if(CkMyPe() == 0) {
138         bcount ++;
139         if(bcount == CkNumPes()){
140             bcount = 0;
141             barrierReached = 1;
142             barrier2Reached = 0;
143
144             if(strategyCreated)
145                 broadcastStrategies();
146         }
147     }
148 }
149
150 //Has finished passing the strategy list to all the processors
151 void ComlibManager::barrier2(){
152     if(CkMyPe() == 0) {
153         b2count ++;
154         ComlibPrintf("In barrier2 %d\n", bcount);
155         if(b2count == CkNumPes()) {
156             b2count = 0; 
157             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
158             cgproxy.resumeFromBarrier2();
159         }
160     }
161 }
162
163 //Registers a set of strategies with the communication library
164 ComlibInstanceHandle ComlibManager::createInstance() {
165   
166     CkpvAccess(conv_com_ptr)->nstrats++;    
167     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
168                                CkpvAccess(cmgrID));  
169     return cinst;
170 }
171
172 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
173     
174     strategyCreated = true;
175
176     ListOfStrategies.enq(strat);
177     strat->setInstance(pos);
178 }
179
180 //End of registering function, if barriers have been reached send them over
181 void ComlibManager::broadcastStrategies() {
182     if(!barrierReached)
183       return;    
184
185     lbUpdateReceived = CmiFalse;
186     barrierReached = 0;
187
188     ComlibPrintf("Sending Strategies %d, %d\n", 
189                  CkpvAccess(conv_com_ptr)->nstrats, 
190                  ListOfStrategies.length());
191
192     StrategyWrapper sw;
193     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
194
195     if(ListOfStrategies.length() > 0) {
196         int len = ListOfStrategies.length();
197         sw.s_table = new Strategy* [len];
198         sw.nstrats = len;
199         
200         for (int count=0; count < len; count++)
201             sw.s_table[count] = ListOfStrategies.deq();
202     }
203     else {
204         sw.nstrats = 0;
205         sw.s_table = 0;
206     }
207
208     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
209     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
210 }
211
212 //Called when the array/group element starts sending messages
213 void ComlibManager::beginIteration(){
214     //right now does not do anything might need later
215     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
216     //prioEndIterationFlag = 0;
217 }
218
219 void ComlibManager::setInstance(int instID){
220
221     curStratID = instID;
222     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
223 }
224
225 //called when the array elements has finished sending messages
226 void ComlibManager::endIteration(){
227     //    prioEndIterationFlag = 1;
228     prevStratID = -1;
229     
230     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
231                  curStratID, 
232                  (* strategyTable)[curStratID].elementCount, 
233                  (* strategyTable)[curStratID].numElements);
234
235     if(isRemote) {
236         isRemote = 0;
237         sendRemote();
238         return;
239     }
240
241     if(!receivedTable) {
242         (* strategyTable)[curStratID].nEndItr++;
243         return;
244     }        
245     
246     (* strategyTable)[curStratID].elementCount++;
247     int count = 0;
248     flushTable = 1;
249     
250     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
251         
252         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
253         
254         //nIterations ++;
255         /*
256         if(nIterations == LEARNING_PERIOD) {
257             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
258             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
259             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
260         }
261         */
262         
263         if(barrier2Reached) {       
264             (* strategyTable)[curStratID].strategy->doneInserting();
265         }
266         else (* strategyTable)[curStratID].call_doneInserting = 1;
267         
268         (* strategyTable)[curStratID].elementCount = 0;
269     }
270     ComlibPrintf("After EndIteration\n");
271 }
272
273 //receive the list of strategies
274 //Insert the strategies into the strategy table in converse comm lib.
275 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
276 void ComlibManager::receiveTable(StrategyWrapper &sw, 
277                                  CkHashtableT<ClibGlobalArrayIndex, int> 
278                                  &htable)
279 {
280
281     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
282                  clibIteration);
283
284     clibIteration ++;
285     receivedTable = 1;
286
287     delete CkpvAccess(locationTable);
288     CkpvAccess(locationTable) =  NULL;
289
290     CkpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
291
292     CkHashtableIterator *ht_iterator = htable.iterator();
293     ht_iterator->seekStart();
294     while(ht_iterator->hasNext()){
295         ClibGlobalArrayIndex *idx;
296         int *pe;
297         pe = (int *)ht_iterator->next((void **)&idx);
298         
299         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
300                      idx->idx.data()[0], *pe);
301
302         CkpvAccess(locationTable)->put(*idx) = *pe;       
303     }
304
305     CkArrayID st_aid;
306     int st_nelements;
307     CkArrayIndexMax *st_elem;
308     int temp_curstratid = curStratID;
309
310     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
311     clib_stats.setNstrats(sw.total_nstrats);
312
313     //First recreate strategies
314     int count = 0;
315     for(count = 0; count < sw.nstrats; count ++) {
316         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
317         
318         //set the instance to the current count
319         //currently all strategies are being copied to all processors
320         //later strategies will be selectively copied
321         
322         //location of this strategy table entry in the strategy table
323         int loc = cur_strategy->getInstance();
324         
325         if(loc >= MAX_NUM_STRATS)
326             CkAbort("Strategy table is full \n");
327
328         CharmStrategy *old_strategy;
329
330         //If this is a learning decision and the old strategy has to
331         //be gotten rid of, finalize it here.
332         if((old_strategy = 
333             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
334            != NULL) {
335             old_strategy->finalizeProcessing();
336
337             //Unregister from array listener if array strategy
338             if(old_strategy->getType() == ARRAY_STRATEGY) {
339                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
340                 as.getSourceArray(st_aid, st_elem, st_nelements);
341
342                 (* strategyTable)[loc].numElements = 0;
343                 if(!st_aid.isZero()) {
344                     ComlibArrayListener *calistener = CkArrayID::
345                         CkLocalBranch(st_aid)->getComlibArrayListener();
346                     
347                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
348                 }
349             }
350         }
351         
352         //Insert strategy, frees an old strategy and sets the
353         //strategy_table entry to point to the new one
354         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
355         
356         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
357
358         if(cur_strategy->getType() == ARRAY_STRATEGY &&
359            cur_strategy->isBracketed()){ 
360
361             ComlibPrintf("Inserting Array Listener\n");
362
363             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
364             as.getSourceArray(st_aid, st_elem, st_nelements);
365             
366             (* strategyTable)[loc].numElements = 0;
367             if(!st_aid.isZero()) {            
368                 ComlibArrayListener *calistener = 
369                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
370                 
371                 calistener->registerStrategy(&((* strategyTable)[loc]));
372             }
373         }              
374         
375         if(cur_strategy->getType() == GROUP_STRATEGY){
376             (* strategyTable)[loc].numElements = 1;
377         }
378         
379         (* strategyTable)[loc].elementCount = 0;
380         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
381     }
382
383     //Resume all end iterarions. Newer strategies may have more 
384     //or fewer elements to expect for!!
385     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
386         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
387                      CkMyPe(), count,
388                      (* strategyTable)[count].nEndItr);
389                          
390         curStratID = count;
391         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
392             endIteration();  
393         
394         (* strategyTable)[count].nEndItr = 0;        
395     }           
396     
397     curStratID = temp_curstratid;
398     ComlibPrintf("receivedTable %d\n", sw.nstrats);
399     
400     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
401     cgproxy[curComlibController].barrier2();
402 }
403
404 void ComlibManager::resumeFromBarrier2(){
405     barrier2Reached = 1;
406     barrierReached = 0;
407
408     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
409
410     //    if(flushTable) {
411     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
412         if (!(* strategyTable)[count].tmplist.isEmpty()) {
413             CharmMessageHolder *cptr;
414             while (!(* strategyTable)[count].tmplist.isEmpty()) {
415                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
416                     (* strategyTable)[count].tmplist.deq();
417                 
418                 if((*strategyTable)[count].strategy->getType() == 
419                    ARRAY_STRATEGY) {
420                     if(cmsg->dest_proc >= 0) {
421                         envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
422                         cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
423                                                        env->getsetArrayIndex());
424                     }
425                     //else
426                     //  CkAbort("NOT FIXED YET\n");                    
427                 }                                
428                 (* strategyTable)[count].strategy->insertMessage(cmsg);
429             }
430         }
431         
432         if ((* strategyTable)[count].call_doneInserting) {
433             (* strategyTable)[count].call_doneInserting = 0;
434             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
435             (* strategyTable)[count].strategy->doneInserting();
436         }
437     }
438     //}    
439     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
440 }
441
442 extern int _charmHandlerIdx;
443
444 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
445                               const CkArrayIndexMax &idx, CkArrayID a){
446     
447     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
448
449     CkArrayIndexMax myidx = idx;
450     int dest_proc = getLastKnown(a, myidx); 
451     //CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
452     
453     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
454     //   UsrToEnv(msg)->getTotalsize(), receivedTable);
455
456     register envelope * env = UsrToEnv(msg);
457     
458     env->getsetArrayMgr()=a;
459     env->getsetArraySrcPe()=CkMyPe();
460     env->getsetArrayEp()=ep;
461     env->getsetArrayHops()=0;
462     env->getsetArrayIndex()=idx;
463     env->setUsed(0);
464     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
465
466     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
467
468     CkPackMessage(&env);
469     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
470     
471     if(isRemote) {
472         CharmMessageHolder *cmsg = new 
473             CharmMessageHolder((char *)msg, dest_proc);
474
475         remoteQ.enq(cmsg);
476         return;
477     }
478
479     /*
480     if(dest_proc == CkMyPe()){
481         CProxyElement_ArrayBase ap(a,idx);
482         ap.ckSend((CkArrayMessage *)msg, ep);
483         return;
484     }
485     */
486
487     //totalMsgCount ++;
488     //totalBytes += UsrToEnv(msg)->getTotalsize();
489
490     CharmMessageHolder *cmsg = new 
491         CharmMessageHolder((char *)msg, dest_proc);
492     //get rid of the new.
493
494     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
495
496     if (receivedTable)
497       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
498     else {
499         flushTable = 1;
500         (* strategyTable)[curStratID].tmplist.enq(cmsg);
501     }
502
503     //CmiPrintf("After Insert\n");
504 }
505
506
507 #include "qd.h"
508 //CkpvExtern(QdState*, _qd);
509
510 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
511     
512     int dest_proc = onPE;
513     /*
514     if(curStratID != prevStratID && prioEndIterationFlag) {        
515         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
516         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
517                      CkMyPe());
518         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg;
519         *(int *)CkPriorityPtr(pmsg) = -0x7FFFFFFF;
520         CkSetQueueing(pmsg, CK_QUEUEING_IFIFO);
521         cgproxy[CkMyPe()].prioEndIteration(pmsg);
522         prioEndIterationFlag = 0;
523     }        
524     prevStratID = curStratID;            
525     */
526
527     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
528                  UsrToEnv(msg)->getTotalsize());
529
530     register envelope * env = UsrToEnv(msg);
531     if(dest_proc == CkMyPe()){
532         _SET_USED(env, 0);
533         CkSendMsgBranch(ep, msg, dest_proc, gid);
534         return;
535     }
536     
537     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
538     CpvAccess(_qd)->create(1);
539
540     env->setMsgtype(ForBocMsg);
541     env->setEpIdx(ep);
542     env->setGroupNum(gid);
543     env->setSrcPe(CkMyPe());
544     env->setUsed(0);
545
546     CkPackMessage(&env);
547     CmiSetHandler(env, _charmHandlerIdx);
548
549     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
550     //get rid of the new.
551     
552     if(receivedTable)
553         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
554     else {
555         flushTable = 1;
556         (* strategyTable)[curStratID].tmplist.enq(cmsg);
557     }
558 }
559
560 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
561     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
562
563     //Broken, add the processor list here.
564
565     register envelope * env = UsrToEnv(m);
566     env->getsetArrayMgr()=a;
567     env->getsetArraySrcPe()=CkMyPe();
568     env->getsetArrayEp()=ep;
569     env->getsetArrayHops()=0;
570     env->getsetArrayIndex()= dummyArrayIndex;
571     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
572
573     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
574
575     CkSectionInfo minfo;
576     minfo.type = COMLIB_MULTICAST_MESSAGE;
577     minfo.sInfo.cInfo.instId = curStratID;
578     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
579     minfo.sInfo.cInfo.id = 0; 
580     minfo.pe = CkMyPe();
581     ((CkMcastBaseMsg *)m)->_cookie = minfo;       
582
583     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
584
585     CharmMessageHolder *cmsg = new 
586         CharmMessageHolder((char *)m, IS_BROADCAST);
587     cmsg->npes = 0;
588     cmsg->pelist = NULL;
589     cmsg->sec_id = NULL;
590
591     multicast(cmsg);
592 }
593
594 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
595                                      CkArrayID a, CkSectionID &s) {
596
597 #ifndef CMK_OPTIMIZE
598     traceUserEvent(section_send_event);
599 #endif
600
601     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
602
603     register envelope * env = UsrToEnv(m);
604     env->getsetArrayMgr()=a;
605     env->getsetArraySrcPe()=CkMyPe();
606     env->getsetArrayEp()=ep;
607     env->getsetArrayHops()=0;
608     env->getsetArrayIndex()= dummyArrayIndex;
609     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
610
611     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
612     
613     env->setUsed(0);    
614     CkPackMessage(&env);
615     
616     //totalMsgCount ++;
617     //totalBytes += env->getTotalsize();
618
619     //Provide a dummy dest proc as it does not matter for mulitcast 
620     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
621                                                       IS_SECTION_MULTICAST);
622     cmsg->npes = 0;
623     cmsg->sec_id = &s;
624
625     CkSectionInfo minfo;
626     minfo.type = COMLIB_MULTICAST_MESSAGE;
627     minfo.sInfo.cInfo.instId = curStratID;
628     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
629     minfo.sInfo.cInfo.id = 0; 
630     minfo.pe = CkMyPe();
631     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
632     //    s.npes = 0;
633     //s.pelist = NULL;
634
635     multicast(cmsg);
636 }
637
638 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
639     register envelope * env = UsrToEnv(m);
640
641     CpvAccess(_qd)->create(1);
642
643     env->setMsgtype(ForBocMsg);
644     env->setEpIdx(ep);
645     env->setGroupNum(g);
646     env->setSrcPe(CkMyPe());
647     env->setUsed(0);
648     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
649
650     CkPackMessage(&env);
651     CmiSetHandler(env, _charmHandlerIdx);
652     
653     //Provide a dummy dest proc as it does not matter for mulitcast 
654     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
655     
656     cmsg->npes = 0;
657     cmsg->pelist = NULL;
658
659     multicast(cmsg);
660 }
661
662 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
663
664     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
665     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
666
667     env->setUsed(0);    
668     CkPackMessage(&env);
669
670     //Will be used to detect multicast message for learning
671     //totalMsgCount ++;
672     //totalBytes += env->getTotalsize();
673     
674     if (receivedTable)
675         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
676     else {
677         flushTable = 1;
678         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
679         (* strategyTable)[curStratID].tmplist.enq(cmsg);
680     }
681
682     ComlibPrintf("After multicast\n");
683 }
684
685 //Collect statistics from all the processors, also gets the list of
686 //array elements on each processor.
687 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
688                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
689     
690     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
691
692     numStatsReceived ++;
693     clib_gstats.updateStats(stat, pe);
694     
695     for(int count = 0; count < idx_vec.length(); count++) {
696         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
697         
698         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
699                      pe);
700         
701         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
702     }        
703
704     if(numStatsReceived == CkNumPes()) {
705         numStatsReceived = 0;
706
707         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
708             count++ ){
709             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
710             if(strat->getType() > CONVERSE_STRATEGY) {
711                 CharmStrategy *cstrat = (CharmStrategy *)strat;
712                 ComlibLearner *learner = cstrat->getLearner();
713                 CharmStrategy *newstrat = NULL;
714                                 
715                 if(learner != NULL) {
716                     ComlibPrintf("Calling Learner\n");
717                     newstrat = (CharmStrategy *)learner->optimizePattern(strat, clib_gstats);
718                     if(newstrat != NULL)
719                         ListOfStrategies.enq(newstrat);
720                 }
721             }
722         }
723         barrierReached = 1;
724         
725         //if(lbUpdateReceived) {
726         //lbUpdateReceived = CmiFalse;
727         broadcastStrategies();
728         //}
729     }
730 }
731
732 void ComlibManager::setRemote(int remote_pe){
733
734     ComlibPrintf("Setting remote flag on\n");
735
736     remotePe = remote_pe;
737     isRemote = 1;
738 }
739
740
741 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
742                                       int strat_id) {
743     setInstance(strat_id);
744     
745     int nmsgs = rq.length();
746
747     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
748
749     for(int count = 0; count < nmsgs; count++) {
750         char *msg = rq.deq()->getCharmMessage();
751         envelope *env = UsrToEnv(msg);
752         
753         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
754                   env->getsetArrayMgr());
755     }
756
757     endIteration();
758 }
759
760 void ComlibManager::sendRemote(){
761     
762     int nmsgs = remoteQ.length();
763
764     //if(nmsgs == 0)
765     //  return;
766
767     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
768
769     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
770     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
771     
772     for(int count = 0; count < nmsgs; count++) {
773         CharmMessageHolder *cmsg = remoteQ.deq();
774         CkFreeMsg(cmsg->getCharmMessage());
775         delete cmsg;
776     }
777 }
778
779
780 void ComlibManager::AtSync() {
781
782     //comm_debug = 1;
783     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
784
785     barrier2Reached = 0;
786     receivedTable = 0;
787     barrierReached = 0;
788     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
789
790     int pos = 0;
791
792     CkVec<ClibGlobalArrayIndex> gidx_vec;
793
794     CkVec<CkArrayID> tmp_vec;
795     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
796         if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
797             CharmStrategy *cstrat = (CharmStrategy*)
798                 ((* strategyTable)[count].strategy);
799             
800             CkArrayID src, dest;
801             CkArrayIndexMax *elements;
802             int nelem;
803             
804             cstrat->ainfo.getSourceArray(src, elements, nelem);
805             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
806
807             CmiBool srcflag = CmiFalse;
808             CmiBool destflag = CmiFalse;
809             
810             if(src == dest || dest.isZero())
811                 destflag = CmiTrue;
812
813             if(src.isZero())
814                 srcflag = CmiTrue;                        
815
816             for(pos = 0; pos < tmp_vec.size(); pos++) {
817                 if(tmp_vec[pos] == src)
818                     srcflag = CmiTrue;
819
820                 if(tmp_vec[pos] == dest)
821                     destflag = CmiTrue;
822
823                 if(srcflag && destflag)
824                     break;
825             }
826
827             if(!srcflag)
828                 tmp_vec.insertAtEnd(src);
829
830             if(!destflag)
831                 tmp_vec.insertAtEnd(dest);
832         }
833         
834         //cant do it here, done in receiveTable
835         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
836         //  (* strategyTable)[count].reset();
837     }
838
839     for(pos = 0; pos < tmp_vec.size(); pos++) {
840         CkArrayID aid = tmp_vec[pos];
841
842         ComlibArrayListener *calistener = 
843             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
844
845         CkVec<CkArrayIndexMax> idx_vec;
846         calistener->getLocalIndices(idx_vec);
847
848         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
849             ClibGlobalArrayIndex gindex;
850             gindex.aid = aid;
851             gindex.idx = idx_vec[idx_count];
852
853             gidx_vec.insertAtEnd(gindex);
854         }
855     }
856
857     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
858     clib_stats.reset();
859 }
860
861 #include "lbdb.h"
862 #include "CentralLB.h"
863
864 /******** FOO BAR : NEEDS to be consistent with array manager *******/
865 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
866     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
867         CkAbort("LDB OBJ ID smaller than array index\n");
868     
869     //values higher than CkArrayIndexMax should be 0
870     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
871         idx.data()[count] = ld_id.id[count];
872     }
873     idx.nInts = 1;
874 }
875
876 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
877     for(int count = 0; count < msg->n_moves; count ++) {
878         MigrateInfo m = msg->moves[count];
879
880         CkArrayID aid; CkArrayIndexMax idx;
881         aid = CkArrayID(m.obj.omhandle.id.id);
882         LDObjID2IdxMax(m.obj.id, idx);
883
884         ClibGlobalArrayIndex cid; 
885         cid.aid = aid;
886         cid.idx = idx;
887         
888         int pe = CkpvAccess(locationTable)->get(cid);
889
890         //Value exists in the table, so update it
891         if(pe != 0) {
892             pe = m.to_pe + CkNumPes();
893             CkpvAccess(locationTable)->getRef(cid) = pe;
894         }
895         //otherwise we dont care about these objects
896     }   
897
898     lbUpdateReceived = CmiTrue;
899     if(barrierReached) {
900         broadcastStrategies();
901         barrierReached = 0;
902     }
903
904     CkFreeMsg(msg);
905 }
906
907
908 void ComlibDelegateProxy(CProxy *proxy){
909     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
910     proxy->ckDelegate(cgproxy.ckLocalBranch());
911 }
912
913 ComlibInstanceHandle CkCreateComlibInstance(){
914     return CkGetComlibInstance();
915 }
916
917 ComlibInstanceHandle CkGetComlibInstance() {
918     if(CkMyPe() != 0)
919         CkAbort("Comlib Instance can only be created on Processor 0");
920     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
921     return (cgproxy.ckLocalBranch())->createInstance();
922 }
923
924 ComlibInstanceHandle CkGetComlibInstance(int id) {
925     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
926     return cinst;
927 }
928
929 void ComlibDoneCreating(){
930     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
931     (cgproxy.ckLocalBranch())->broadcastStrategies();
932 }
933
934 char *router;
935 int sfactor=0;
936
937 class ComlibManagerMain {
938 public:
939     ComlibManagerMain(CkArgMsg *msg) {
940         
941         if(CkMyPe() == 0 && msg !=  NULL)
942             CmiGetArgString(msg->argv, "+strategy", &router);         
943
944         if(CkMyPe() == 0 && msg !=  NULL)
945             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
946         
947         CProxy_ComlibManager::ckNew();
948     }
949 };
950
951 //Called by user code
952 ComlibInstanceHandle::ComlibInstanceHandle(){
953     _instid = -1;
954     _dmid.setZero();
955     _srcPe = -1;
956     toForward = 0;
957 }
958
959 //Called by user code
960 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h){
961     _instid = h._instid;
962     _dmid = h._dmid;
963     toForward = h.toForward;
964
965     ComlibPrintf("In Copy Constructor\n");
966
967     //We DO NOT copy the source processor
968     //Source PE is initialized here
969     _srcPe = h._srcPe;
970 }
971
972 void ComlibInstanceHandle::init(){
973     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
974     *this = (cgproxy.ckLocalBranch())->createInstance();
975 }
976
977 //Called by the communication library
978 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
979     _instid = instid;
980     _dmid   = dmid;
981     _srcPe  = -1;
982     toForward = 0;
983 }
984
985 void ComlibInstanceHandle::beginIteration() { 
986     CProxy_ComlibManager cgproxy(_dmid);
987
988     ComlibPrintf("Instance Handle beginIteration %d, %d\n", CkMyPe(), _srcPe);
989
990     //User forgot to make the instance handle a readonly or pass it
991     //into the constructor of an array and is using it directly from
992     //Main :: main
993     if(_srcPe == -1) {
994         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
995         _srcPe = CkMyPe();
996     }
997
998     if(_srcPe != CkMyPe() && toForward) {
999         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1000     }
1001
1002     (cgproxy.ckLocalBranch())->setInstance(_instid);
1003     (cgproxy.ckLocalBranch())->beginIteration();   
1004 }
1005
1006 void ComlibInstanceHandle::endIteration() {
1007     CProxy_ComlibManager cgproxy(_dmid);
1008     (cgproxy.ckLocalBranch())->endIteration();
1009 }
1010
1011 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1012     toForward = s->getForwardOnMigration();
1013     CProxy_ComlibManager cgproxy(_dmid);
1014     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1015 }
1016
1017 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1018     if(_instid < 0) 
1019         return NULL;    
1020     
1021     CProxy_ComlibManager cgproxy(_dmid);
1022     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1023 }
1024
1025 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1026
1027 void ComlibInitSectionID(CkSectionID &sid){
1028     
1029     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1030     sid._cookie.pe = CkMyPe();
1031     
1032     sid._cookie.sInfo.cInfo.id = 0;    
1033     sid.npes = 0;
1034     sid.pelist = NULL;
1035 }
1036
1037 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1038     CkSectionID &sid = sproxy->ckGetSectionID();
1039     ComlibInitSectionID(sid);
1040     sid._cookie.sInfo.cInfo.status = 0;
1041 }
1042
1043 // for backward compatibility - for old name commlib
1044 void _registercommlib(void)
1045 {
1046   static int _done = 0; 
1047   if(_done) 
1048       return; 
1049   _done = 1;
1050   _registercomlib();
1051 }
1052
1053 void ComlibAtSyncHandler(void *msg) {
1054     CmiFree(msg);
1055     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1056     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1057     if(cmgr_ptr)
1058         cmgr_ptr->AtSync();    
1059 }
1060
1061 void ComlibNotifyMigrationDoneHandler(void *msg) {
1062     CmiFree(msg);
1063     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1064     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1065     if(cmgr_ptr)
1066         cmgr_ptr->AtSync();    
1067 }
1068
1069
1070 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1071     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1072     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1073 }
1074
1075 #include "comlib.def.h"
1076
1077