eff6bfc79aaf63cac3f14e5c6714921cb1dca9bc
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_com_ptr);
22
23 //handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
27
28     if(msg == NULL)
29         return;
30     
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     int srcPe = env->getSrcPe();
37     int sid = ((CmiMsgHeaderExt *) env)->stratid;
38
39     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
40              sid, env->getTotalsize(), srcPe);
41
42     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
43     
44     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
45     //if(!comm_debug)
46     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
47
48     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
49     return;
50 }
51
52 void recv_combined_array_msg(void *msg){
53     if(msg == NULL)
54         return;
55     
56     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
57
58     MsgPacker::deliver((CombinedMessage *)msg);
59 }
60
61 ComlibManager::ComlibManager(){
62     init();
63     ComlibPrintf("In comlibmanager constructor\n");
64 }
65
66 void ComlibManager::init(){
67     
68     initComlibManager();
69
70     PUPable_reg(CharmStrategy);
71     PUPable_reg(CharmMessageHolder);
72     
73     //comm_debug = 1;
74     
75     numStatsReceived = 0;
76     curComlibController = 0;
77     clibIteration = 0;
78     
79     strategyCreated = CmiFalse;
80
81     CkpvInitialize(ClibLocationTableType*, locationTable);
82     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
83
84     CkpvInitialize(CkArrayIndexMax, cache_index);
85     CkpvInitialize(int, cache_pe);
86     CkpvInitialize(CkArrayID, cache_aid);
87
88     CkpvAccess(cache_index).nInts = -1;
89     CkpvAccess(cache_aid).setZero();
90
91     CkpvInitialize(int, RecvmsgHandle);
92     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
93
94     bcast_pelist = new int [CkNumPes()];
95     for(int brcount = 0; brcount < CkNumPes(); brcount++)
96         bcast_pelist[brcount] = brcount;
97
98     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
99     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
100         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
101     
102     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
103     
104     npes = CkNumPes();
105     pelist = NULL;
106
107     CkpvInitialize(CkGroupID, cmgrID);
108     CkpvAccess(cmgrID) = thisgroup;
109
110     dummyArrayIndex.nInts = 0;
111
112     curStratID = 0;
113     prevStratID = -1;
114     //prioEndIterationFlag = 1;
115
116     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
117     
118     receivedTable = 0;
119     flushTable = 0;
120     //    totalMsgCount = 0;
121     //    totalBytes = 0;
122     //nIterations = 0;
123     barrierReached = 0;
124     barrier2Reached = 0;
125
126     bcount = b2count = 0;
127     lbUpdateReceived = CmiFalse;
128
129     isRemote = 0;
130     remotePe = -1;
131
132     CkpvInitialize(int, migrationDoneHandlerID);
133     CkpvAccess(migrationDoneHandlerID) = 
134         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
135     
136     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
137     cgproxy[curComlibController].barrier();
138 }
139
140 //First barrier makes sure that the communication library group 
141 //has been created on all processors
142 void ComlibManager::barrier(){
143     ComlibPrintf("In barrier %d\n", bcount);
144     if(CkMyPe() == 0) {
145         bcount ++;
146         if(bcount == CkNumPes()){
147             bcount = 0;
148             barrierReached = 1;
149             barrier2Reached = 0;
150
151             if(strategyCreated)
152                 broadcastStrategies();
153         }
154     }
155 }
156
157 //Has finished passing the strategy list to all the processors
158 void ComlibManager::barrier2(){
159     if(CkMyPe() == 0) {
160         b2count ++;
161         ComlibPrintf("In barrier2 %d\n", bcount);
162         if(b2count == CkNumPes()) {
163             b2count = 0; 
164             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
165             cgproxy.resumeFromBarrier2();
166         }
167     }
168 }
169
170 //Registers a set of strategies with the communication library
171 ComlibInstanceHandle ComlibManager::createInstance() {
172   
173     CkpvAccess(conv_com_ptr)->nstrats++;    
174     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
175                                CkpvAccess(cmgrID));  
176     return cinst;
177 }
178
179 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
180     
181     strategyCreated = true;
182
183     ListOfStrategies.enq(strat);
184     strat->setInstance(pos);
185 }
186
187 //End of registering function, if barriers have been reached send them over
188 void ComlibManager::broadcastStrategies() {
189     if(!barrierReached)
190       return;    
191
192     lbUpdateReceived = CmiFalse;
193     barrierReached = 0;
194
195     ComlibPrintf("Sending Strategies %d, %d\n", 
196                  CkpvAccess(conv_com_ptr)->nstrats, 
197                  ListOfStrategies.length());
198
199     StrategyWrapper sw;
200     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
201
202     if(ListOfStrategies.length() > 0) {
203         int len = ListOfStrategies.length();
204         sw.s_table = new Strategy* [len];
205         sw.nstrats = len;
206         
207         for (int count=0; count < len; count++)
208             sw.s_table[count] = ListOfStrategies.deq();
209     }
210     else {
211         sw.nstrats = 0;
212         sw.s_table = 0;
213     }
214
215     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
216     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
217 }
218
219 //Called when the array/group element starts sending messages
220 void ComlibManager::beginIteration(){
221     //right now does not do anything might need later
222     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
223     //prioEndIterationFlag = 0;
224 }
225
226 void ComlibManager::setInstance(int instID){
227
228     curStratID = instID;
229     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
230 }
231
232 //called when the array elements has finished sending messages
233 void ComlibManager::endIteration(){
234     //    prioEndIterationFlag = 1;
235     prevStratID = -1;
236     
237     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
238                  curStratID, 
239                  (* strategyTable)[curStratID].elementCount, 
240                  (* strategyTable)[curStratID].numElements);
241
242     if(isRemote) {
243         isRemote = 0;
244         sendRemote();
245         return;
246     }
247
248     if(!receivedTable) {
249         (* strategyTable)[curStratID].nEndItr++;
250         return;
251     }        
252     
253     (* strategyTable)[curStratID].elementCount++;
254     int count = 0;
255     flushTable = 1;
256     
257     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
258         
259         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
260         
261         //nIterations ++;
262         /*
263         if(nIterations == LEARNING_PERIOD) {
264             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
265             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
266             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
267         }
268         */
269         
270         if(barrier2Reached) {       
271             (* strategyTable)[curStratID].strategy->doneInserting();
272         }
273         else (* strategyTable)[curStratID].call_doneInserting = 1;
274         
275         (* strategyTable)[curStratID].elementCount = 0;
276     }
277     ComlibPrintf("After EndIteration\n");
278 }
279
280 //receive the list of strategies
281 //Insert the strategies into the strategy table in converse comm lib.
282 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
283 void ComlibManager::receiveTable(StrategyWrapper &sw, 
284                                  CkHashtableT<ClibGlobalArrayIndex, int> 
285                                  &htable)
286 {
287
288     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
289                  clibIteration);
290
291     clibIteration ++;
292     receivedTable = 1;
293
294     delete CkpvAccess(locationTable);
295     CkpvAccess(locationTable) =  NULL;
296
297     CkpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
298
299     CkHashtableIterator *ht_iterator = htable.iterator();
300     ht_iterator->seekStart();
301     while(ht_iterator->hasNext()){
302         ClibGlobalArrayIndex *idx;
303         int *pe;
304         pe = (int *)ht_iterator->next((void **)&idx);
305         
306         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
307                      idx->idx.data()[0], *pe);
308
309         CkpvAccess(locationTable)->put(*idx) = *pe;       
310     }
311
312     CkArrayID st_aid;
313     int st_nelements;
314     CkArrayIndexMax *st_elem;
315     int temp_curstratid = curStratID;
316
317     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
318     clib_stats.setNstrats(sw.total_nstrats);
319
320     //First recreate strategies
321     int count = 0;
322     for(count = 0; count < sw.nstrats; count ++) {
323         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
324         
325         //set the instance to the current count
326         //currently all strategies are being copied to all processors
327         //later strategies will be selectively copied
328         
329         //location of this strategy table entry in the strategy table
330         int loc = cur_strategy->getInstance();
331         
332         if(loc >= MAX_NUM_STRATS)
333             CkAbort("Strategy table is full \n");
334
335         CharmStrategy *old_strategy;
336
337         //If this is a learning decision and the old strategy has to
338         //be gotten rid of, finalize it here.
339         if((old_strategy = 
340             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
341            != NULL) {
342             old_strategy->finalizeProcessing();
343
344             //Unregister from array listener if array strategy
345             if(old_strategy->getType() == ARRAY_STRATEGY) {
346                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
347                 as.getSourceArray(st_aid, st_elem, st_nelements);
348
349                 (* strategyTable)[loc].numElements = 0;
350                 if(!st_aid.isZero()) {
351                     ComlibArrayListener *calistener = CkArrayID::
352                         CkLocalBranch(st_aid)->getComlibArrayListener();
353                     
354                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
355                 }
356             }
357         }
358         
359         //Insert strategy, frees an old strategy and sets the
360         //strategy_table entry to point to the new one
361         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
362         
363         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
364
365         if(cur_strategy->getType() == ARRAY_STRATEGY &&
366            cur_strategy->isBracketed()){ 
367
368             ComlibPrintf("Inserting Array Listener\n");
369
370             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
371             as.getSourceArray(st_aid, st_elem, st_nelements);
372             
373             (* strategyTable)[loc].numElements = 0;
374             if(!st_aid.isZero()) {            
375                 ComlibArrayListener *calistener = 
376                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
377                 
378                 calistener->registerStrategy(&((* strategyTable)[loc]));
379             }
380         }                      
381         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
382           (* strategyTable)[loc].numElements = 1;
383         }
384         
385         (* strategyTable)[loc].elementCount = 0;
386         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
387     }
388
389     //Resume all end iterarions. Newer strategies may have more 
390     //or fewer elements to expect for!!
391     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
392         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
393                      CkMyPe(), count,
394                      (* strategyTable)[count].nEndItr);
395                          
396         curStratID = count;
397         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
398             endIteration();  
399         
400         (* strategyTable)[count].nEndItr = 0;        
401     }           
402     
403     curStratID = temp_curstratid;
404     ComlibPrintf("receivedTable %d\n", sw.nstrats);
405     
406     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
407     cgproxy[curComlibController].barrier2();
408 }
409
410 void ComlibManager::resumeFromBarrier2(){
411     barrier2Reached = 1;
412     barrierReached = 0;
413
414     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
415
416     //    if(flushTable) {
417     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
418         if (!(* strategyTable)[count].tmplist.isEmpty()) {
419             CharmMessageHolder *cptr;
420             while (!(* strategyTable)[count].tmplist.isEmpty()) {
421                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
422                     (* strategyTable)[count].tmplist.deq();
423                 
424                 if((*strategyTable)[count].strategy->getType() == 
425                    ARRAY_STRATEGY) {
426                     if(cmsg->dest_proc >= 0) {
427                         envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
428                         cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
429                                                        env->getsetArrayIndex());
430                     }
431                     //else
432                     //  CkAbort("NOT FIXED YET\n");                    
433                 }                                
434                 (* strategyTable)[count].strategy->insertMessage(cmsg);
435             }
436         }
437         
438         if ((* strategyTable)[count].call_doneInserting) {
439             (* strategyTable)[count].call_doneInserting = 0;
440             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
441             (* strategyTable)[count].strategy->doneInserting();
442         }
443     }
444     //}    
445     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
446 }
447
448 extern int _charmHandlerIdx;
449
450 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
451                               const CkArrayIndexMax &idx, CkArrayID a){
452     
453     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
454
455     CkArrayIndexMax myidx = idx;
456     int dest_proc = getLastKnown(a, myidx); 
457     //CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
458     
459     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
460     //   UsrToEnv(msg)->getTotalsize(), receivedTable);
461
462     register envelope * env = UsrToEnv(msg);
463     
464     env->getsetArrayMgr()=a;
465     env->getsetArraySrcPe()=CkMyPe();
466     env->getsetArrayEp()=ep;
467     env->getsetArrayHops()=0;
468     env->getsetArrayIndex()=idx;
469     env->setUsed(0);
470     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
471
472     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
473
474     CkPackMessage(&env);
475     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
476     
477     if(isRemote) {
478         CharmMessageHolder *cmsg = new 
479             CharmMessageHolder((char *)msg, dest_proc);
480
481         remoteQ.enq(cmsg);
482         return;
483     }
484
485     //Any bug here? FOO BAR??
486     if(dest_proc == CkMyPe()){
487         CProxyElement_ArrayBase ap(a,idx);
488         ap.ckSend((CkArrayMessage *)msg, ep);
489         return;
490     }
491
492     //totalMsgCount ++;
493     //totalBytes += UsrToEnv(msg)->getTotalsize();
494
495     CharmMessageHolder *cmsg = new 
496         CharmMessageHolder((char *)msg, dest_proc);
497     //get rid of the new.
498
499     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
500
501     if (receivedTable)
502       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
503     else {
504         flushTable = 1;
505         (* strategyTable)[curStratID].tmplist.enq(cmsg);
506     }
507
508     //CmiPrintf("After Insert\n");
509 }
510
511
512 #include "qd.h"
513 //CkpvExtern(QdState*, _qd);
514
515 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
516     
517     int dest_proc = onPE;
518     /*
519     if(curStratID != prevStratID && prioEndIterationFlag) {        
520         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
521         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
522                      CkMyPe());
523         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg;
524         *(int *)CkPriorityPtr(pmsg) = -0x7FFFFFFF;
525         CkSetQueueing(pmsg, CK_QUEUEING_IFIFO);
526         cgproxy[CkMyPe()].prioEndIteration(pmsg);
527         prioEndIterationFlag = 0;
528     }        
529     prevStratID = curStratID;            
530     */
531
532     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
533                  UsrToEnv(msg)->getTotalsize());
534
535     register envelope * env = UsrToEnv(msg);
536     if(dest_proc == CkMyPe()){
537         _SET_USED(env, 0);
538         CkSendMsgBranch(ep, msg, dest_proc, gid);
539         return;
540     }
541     
542     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
543     CpvAccess(_qd)->create(1);
544
545     env->setMsgtype(ForBocMsg);
546     env->setEpIdx(ep);
547     env->setGroupNum(gid);
548     env->setSrcPe(CkMyPe());
549     env->setUsed(0);
550
551     CkPackMessage(&env);
552     CmiSetHandler(env, _charmHandlerIdx);
553
554     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
555     //get rid of the new.
556     
557     if(receivedTable)
558         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
559     else {
560         flushTable = 1;
561         (* strategyTable)[curStratID].tmplist.enq(cmsg);
562     }
563 }
564
565 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
566     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
567
568     //Broken, add the processor list here.
569
570     register envelope * env = UsrToEnv(m);
571     env->getsetArrayMgr()=a;
572     env->getsetArraySrcPe()=CkMyPe();
573     env->getsetArrayEp()=ep;
574     env->getsetArrayHops()=0;
575     env->getsetArrayIndex()= dummyArrayIndex;
576     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
577
578     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
579     
580     /*  //section multicast header not needed for broadcast
581       CkSectionInfo minfo;
582       minfo.type = COMLIB_MULTICAST_MESSAGE;
583       minfo.sInfo.cInfo.instId = curStratID;
584       minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
585       minfo.sInfo.cInfo.id = 0; 
586       minfo.pe = CkMyPe();
587       ((CkMcastBaseMsg *)m)->_cookie = minfo;       
588     */
589
590     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
591
592     CharmMessageHolder *cmsg = new 
593         CharmMessageHolder((char *)m, IS_BROADCAST);
594     cmsg->npes = 0;
595     cmsg->pelist = NULL;
596     cmsg->sec_id = NULL;
597
598     multicast(cmsg);
599 }
600
601 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
602                                      CkArrayID a, CkSectionID &s, int opts) {
603
604 #ifndef CMK_OPTIMIZE
605     traceUserEvent(section_send_event);
606 #endif
607
608     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
609
610     register envelope * env = UsrToEnv(m);
611     env->getsetArrayMgr()=a;
612     env->getsetArraySrcPe()=CkMyPe();
613     env->getsetArrayEp()=ep;
614     env->getsetArrayHops()=0;
615     env->getsetArrayIndex()= dummyArrayIndex;
616     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
617
618     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
619     
620     env->setUsed(0);    
621     CkPackMessage(&env);
622     
623     //totalMsgCount ++;
624     //totalBytes += env->getTotalsize();
625
626     //Provide a dummy dest proc as it does not matter for mulitcast 
627     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
628                                                       IS_SECTION_MULTICAST);
629     cmsg->npes = 0;
630     cmsg->sec_id = &s;
631
632     CkSectionInfo minfo;
633     minfo.type = COMLIB_MULTICAST_MESSAGE;
634     minfo.sInfo.cInfo.instId = curStratID;
635     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
636     minfo.sInfo.cInfo.id = 0; 
637     minfo.pe = CkMyPe();
638     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
639     //    s.npes = 0;
640     //s.pelist = NULL;
641
642     multicast(cmsg);
643 }
644
645 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
646     register envelope * env = UsrToEnv(m);
647
648     CpvAccess(_qd)->create(1);
649
650     env->setMsgtype(ForBocMsg);
651     env->setEpIdx(ep);
652     env->setGroupNum(g);
653     env->setSrcPe(CkMyPe());
654     env->setUsed(0);
655     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
656
657     CkPackMessage(&env);
658     CmiSetHandler(env, _charmHandlerIdx);
659     
660     //Provide a dummy dest proc as it does not matter for mulitcast 
661     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
662     
663     cmsg->npes = 0;
664     cmsg->pelist = NULL;
665
666     multicast(cmsg);
667 }
668
669 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
670
671     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
672     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
673
674     env->setUsed(0);    
675     CkPackMessage(&env);
676
677     //Will be used to detect multicast message for learning
678     //totalMsgCount ++;
679     //totalBytes += env->getTotalsize();
680     
681     if (receivedTable)
682         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
683     else {
684         flushTable = 1;
685         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
686         (* strategyTable)[curStratID].tmplist.enq(cmsg);
687     }
688
689     ComlibPrintf("After multicast\n");
690 }
691
692 //Collect statistics from all the processors, also gets the list of
693 //array elements on each processor.
694 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
695                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
696     
697     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
698
699     numStatsReceived ++;
700     clib_gstats.updateStats(stat, pe);
701     
702     for(int count = 0; count < idx_vec.length(); count++) {
703         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
704         
705         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
706                      pe);
707         
708         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
709     }        
710
711     if(numStatsReceived == CkNumPes()) {
712         numStatsReceived = 0;
713
714         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
715             count++ ){
716             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
717             if(strat->getType() > CONVERSE_STRATEGY) {
718                 CharmStrategy *cstrat = (CharmStrategy *)strat;
719                 ComlibLearner *learner = cstrat->getLearner();
720                 CharmStrategy *newstrat = NULL;
721                                 
722                 if(learner != NULL) {
723                     ComlibPrintf("Calling Learner\n");
724                     newstrat = (CharmStrategy *)learner->optimizePattern(strat, clib_gstats);
725                     if(newstrat != NULL)
726                         ListOfStrategies.enq(newstrat);
727                 }
728             }
729         }
730         barrierReached = 1;
731         
732         //if(lbUpdateReceived) {
733         //lbUpdateReceived = CmiFalse;
734         broadcastStrategies();
735         //}
736     }
737 }
738
739 void ComlibManager::setRemote(int remote_pe){
740
741     ComlibPrintf("Setting remote flag on\n");
742
743     remotePe = remote_pe;
744     isRemote = 1;
745 }
746
747
748 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
749                                       int strat_id) {
750     setInstance(strat_id);
751     
752     int nmsgs = rq.length();
753
754     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
755
756     for(int count = 0; count < nmsgs; count++) {
757         char *msg = rq.deq()->getCharmMessage();
758         envelope *env = UsrToEnv(msg);
759         
760         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
761                   env->getsetArrayMgr());
762     }
763
764     endIteration();
765 }
766
767 void ComlibManager::sendRemote(){
768     
769     int nmsgs = remoteQ.length();
770
771     //if(nmsgs == 0)
772     //  return;
773
774     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
775
776     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
777     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
778     
779     for(int count = 0; count < nmsgs; count++) {
780         CharmMessageHolder *cmsg = remoteQ.deq();
781         CkFreeMsg(cmsg->getCharmMessage());
782         delete cmsg;
783     }
784 }
785
786
787 void ComlibManager::AtSync() {
788
789     //comm_debug = 1;
790     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
791
792     barrier2Reached = 0;
793     receivedTable = 0;
794     barrierReached = 0;
795     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
796
797     int pos = 0;
798
799     CkVec<ClibGlobalArrayIndex> gidx_vec;
800
801     CkVec<CkArrayID> tmp_vec;
802     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
803         if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
804             CharmStrategy *cstrat = (CharmStrategy*)
805                 ((* strategyTable)[count].strategy);
806             
807             CkArrayID src, dest;
808             CkArrayIndexMax *elements;
809             int nelem;
810             
811             cstrat->ainfo.getSourceArray(src, elements, nelem);
812             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
813
814             CmiBool srcflag = CmiFalse;
815             CmiBool destflag = CmiFalse;
816             
817             if(src == dest || dest.isZero())
818                 destflag = CmiTrue;
819
820             if(src.isZero())
821                 srcflag = CmiTrue;                        
822
823             for(pos = 0; pos < tmp_vec.size(); pos++) {
824                 if(tmp_vec[pos] == src)
825                     srcflag = CmiTrue;
826
827                 if(tmp_vec[pos] == dest)
828                     destflag = CmiTrue;
829
830                 if(srcflag && destflag)
831                     break;
832             }
833
834             if(!srcflag)
835                 tmp_vec.insertAtEnd(src);
836
837             if(!destflag)
838                 tmp_vec.insertAtEnd(dest);
839         }
840         
841         //cant do it here, done in receiveTable
842         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
843         //  (* strategyTable)[count].reset();
844     }
845
846     for(pos = 0; pos < tmp_vec.size(); pos++) {
847         CkArrayID aid = tmp_vec[pos];
848
849         ComlibArrayListener *calistener = 
850             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
851
852         CkVec<CkArrayIndexMax> idx_vec;
853         calistener->getLocalIndices(idx_vec);
854
855         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
856             ClibGlobalArrayIndex gindex;
857             gindex.aid = aid;
858             gindex.idx = idx_vec[idx_count];
859
860             gidx_vec.insertAtEnd(gindex);
861         }
862     }
863
864     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
865     clib_stats.reset();
866 }
867
868 #include "lbdb.h"
869 #include "CentralLB.h"
870
871 /******** FOO BAR : NEEDS to be consistent with array manager *******/
872 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
873     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
874         CkAbort("LDB OBJ ID smaller than array index\n");
875     
876     //values higher than CkArrayIndexMax should be 0
877     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
878         idx.data()[count] = ld_id.id[count];
879     }
880     idx.nInts = 1;
881 }
882
883 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
884     for(int count = 0; count < msg->n_moves; count ++) {
885         MigrateInfo m = msg->moves[count];
886
887         CkArrayID aid; CkArrayIndexMax idx;
888         aid = CkArrayID(m.obj.omhandle.id.id);
889         LDObjID2IdxMax(m.obj.id, idx);
890
891         ClibGlobalArrayIndex cid; 
892         cid.aid = aid;
893         cid.idx = idx;
894         
895         int pe = CkpvAccess(locationTable)->get(cid);
896
897         //Value exists in the table, so update it
898         if(pe != 0) {
899             pe = m.to_pe + CkNumPes();
900             CkpvAccess(locationTable)->getRef(cid) = pe;
901         }
902         //otherwise we dont care about these objects
903     }   
904
905     lbUpdateReceived = CmiTrue;
906     if(barrierReached) {
907         broadcastStrategies();
908         barrierReached = 0;
909     }
910
911     CkFreeMsg(msg);
912 }
913
914 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
915     ComlibInstanceHandle *inst = new ComlibInstanceHandle
916         (*((ComlibInstanceHandle *)data));
917     return inst;
918 }
919
920
921 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
922                                                    CkDelegateData *pd) {
923
924     CmiBool to_pup = CmiFalse;
925
926     ComlibInstanceHandle *inst; 
927     if(!p.isUnpacking()) {
928         inst = (ComlibInstanceHandle *) pd;       
929         if(pd != NULL)
930             to_pup = CmiTrue;
931     }
932     else 
933         //Call migrate constructor
934         inst = new ComlibInstanceHandle();
935
936     p | to_pup;
937
938     if(to_pup)
939         inst->pup(p);
940     return inst;
941 }    
942
943
944 void ComlibDelegateProxy(CProxy *proxy){
945     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
946     proxy->ckDelegate(cgproxy.ckLocalBranch());
947 }
948
949 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
950     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
951         (CkGetComlibInstance());
952
953     cinst->setStrategy(strat);
954     
955     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
956     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
957
958
959 void ComlibBegin(CProxy &proxy) {
960     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
961     cinst->beginIteration();
962
963
964 void ComlibEnd(CProxy &proxy) {
965     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
966     cinst->endIteration();
967 }
968
969 ComlibInstanceHandle CkCreateComlibInstance(){
970     return CkGetComlibInstance();
971 }
972
973 ComlibInstanceHandle CkGetComlibInstance() {
974     if(CkMyPe() != 0)
975         CkAbort("Comlib Instance can only be created on Processor 0");
976     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
977     return (cgproxy.ckLocalBranch())->createInstance();
978 }
979
980 ComlibInstanceHandle CkGetComlibInstance(int id) {
981     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
982     return cinst;
983 }
984
985 void ComlibDoneCreating(){
986     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
987     (cgproxy.ckLocalBranch())->broadcastStrategies();
988 }
989
990 char *router;
991 int sfactor=0;
992
993 class ComlibManagerMain {
994 public:
995     ComlibManagerMain(CkArgMsg *msg) {
996         
997         if(CkMyPe() == 0 && msg !=  NULL)
998             CmiGetArgString(msg->argv, "+strategy", &router);         
999
1000         if(CkMyPe() == 0 && msg !=  NULL)
1001             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1002         
1003         CProxy_ComlibManager::ckNew();
1004     }
1005 };
1006
1007 //Called by user code
1008 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1009     _instid = -1;
1010     _dmid.setZero();
1011     _srcPe = -1;
1012     toForward = 0;
1013 }
1014
1015 //Called by user code
1016 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1017     : CkDelegateData() {
1018     _instid = h._instid;
1019     _dmid = h._dmid;
1020     toForward = h.toForward;        
1021
1022     ComlibPrintf("In Copy Constructor\n");
1023     _srcPe = h._srcPe;
1024
1025     reset();
1026     ref();
1027 }
1028
1029 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1030     _instid = h._instid;
1031     _dmid = h._dmid;
1032     toForward = h.toForward;
1033     _srcPe = h._srcPe;
1034     
1035     reset();
1036     ref();
1037     return *this;
1038 }
1039
1040 //Called by the communication library
1041 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1042     _instid = instid;
1043     _dmid   = dmid;
1044     _srcPe  = -1;
1045     toForward = 0;
1046 }
1047
1048 void ComlibInstanceHandle::beginIteration() { 
1049     CProxy_ComlibManager cgproxy(_dmid);
1050
1051     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1052
1053     //User forgot to make the instance handle a readonly or pass it
1054     //into the constructor of an array and is using it directly from
1055     //Main :: main
1056     if(_srcPe == -1) {
1057         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1058         _srcPe = CkMyPe();
1059     }
1060
1061     if(_srcPe != CkMyPe() && toForward) {
1062         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1063     }
1064
1065     (cgproxy.ckLocalBranch())->setInstance(_instid);
1066     (cgproxy.ckLocalBranch())->beginIteration();   
1067 }
1068
1069 void ComlibInstanceHandle::endIteration() {
1070     CProxy_ComlibManager cgproxy(_dmid);
1071     (cgproxy.ckLocalBranch())->endIteration();
1072 }
1073
1074 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1075     toForward = s->getForwardOnMigration();
1076     CProxy_ComlibManager cgproxy(_dmid);
1077     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1078 }
1079
1080 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1081     if(_instid < 0) 
1082         return NULL;    
1083     
1084     CProxy_ComlibManager cgproxy(_dmid);
1085     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1086 }
1087
1088 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1089
1090 void ComlibInitSectionID(CkSectionID &sid){
1091     
1092     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1093     sid._cookie.pe = CkMyPe();
1094     
1095     sid._cookie.sInfo.cInfo.id = 0;    
1096     sid.npes = 0;
1097     sid.pelist = NULL;
1098 }
1099
1100 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1101     CkSectionID &sid = sproxy->ckGetSectionID();
1102     ComlibInitSectionID(sid);
1103     sid._cookie.sInfo.cInfo.status = 0;
1104 }
1105
1106 // for backward compatibility - for old name commlib
1107 void _registercommlib(void)
1108 {
1109   static int _done = 0; 
1110   if(_done) 
1111       return; 
1112   _done = 1;
1113   _registercomlib();
1114 }
1115
1116 void ComlibAtSyncHandler(void *msg) {
1117     CmiFree(msg);
1118     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1119     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1120     if(cmgr_ptr)
1121         cmgr_ptr->AtSync();    
1122 }
1123
1124 void ComlibNotifyMigrationDoneHandler(void *msg) {
1125     CmiFree(msg);
1126     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1127     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1128     if(cmgr_ptr)
1129         cmgr_ptr->AtSync();    
1130 }
1131
1132
1133 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1134     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1135     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1136 }
1137
1138 #include "comlib.def.h"
1139
1140