Fixing bugs in the learning framework.
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_com_ptr);
22
23 //handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
27
28     if(msg == NULL)
29         return;
30     
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     int srcPe = env->getSrcPe();
37     int sid = ((CmiMsgHeaderExt *) env)->stratid;
38
39     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
40              sid, env->getTotalsize(), srcPe);
41
42     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
43     
44     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
45     //if(!comm_debug)
46     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
47
48     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
49     return;
50 }
51
52 void recv_combined_array_msg(void *msg){
53     if(msg == NULL)
54         return;
55     
56     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
57
58     MsgPacker::deliver((CombinedMessage *)msg);
59 }
60
61 ComlibManager::ComlibManager(){
62     init();
63     ComlibPrintf("In comlibmanager constructor\n");
64 }
65
66 void ComlibManager::init(){
67     
68     initComlibManager();
69
70     PUPable_reg(CharmStrategy);
71     PUPable_reg(CharmMessageHolder);
72     
73     //comm_debug = 1;
74     
75     numStatsReceived = 0;
76     curComlibController = 0;
77     clibIteration = 0;
78     
79     strategyCreated = CmiFalse;
80
81     CkpvInitialize(ClibLocationTableType*, locationTable);
82     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
83
84     CkpvInitialize(CkArrayIndexMax, cache_index);
85     CkpvInitialize(int, cache_pe);
86     CkpvInitialize(CkArrayID, cache_aid);
87
88     CkpvAccess(cache_index).nInts = -1;
89     CkpvAccess(cache_aid).setZero();
90
91     CkpvInitialize(int, RecvmsgHandle);
92     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
93
94     bcast_pelist = new int [CkNumPes()];
95     for(int brcount = 0; brcount < CkNumPes(); brcount++)
96         bcast_pelist[brcount] = brcount;
97
98     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
99     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
100         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
101     
102     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
103     
104     npes = CkNumPes();
105     pelist = NULL;
106
107     CkpvInitialize(CkGroupID, cmgrID);
108     CkpvAccess(cmgrID) = thisgroup;
109
110     dummyArrayIndex.nInts = 0;
111
112     curStratID = 0;
113     prevStratID = -1;
114     //prioEndIterationFlag = 1;
115
116     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
117     
118     receivedTable = 0;
119     flushTable = 0;
120     //    totalMsgCount = 0;
121     //    totalBytes = 0;
122     //nIterations = 0;
123     barrierReached = 0;
124     barrier2Reached = 0;
125
126     bcount = b2count = 0;
127     lbUpdateReceived = CmiFalse;
128
129     isRemote = 0;
130     remotePe = -1;
131
132     CkpvInitialize(int, migrationDoneHandlerID);
133     CkpvAccess(migrationDoneHandlerID) = 
134         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
135     
136     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
137     cgproxy[curComlibController].barrier();
138 }
139
140 //First barrier makes sure that the communication library group 
141 //has been created on all processors
142 void ComlibManager::barrier(){
143     ComlibPrintf("In barrier %d\n", bcount);
144     if(CkMyPe() == 0) {
145         bcount ++;
146         if(bcount == CkNumPes()){
147             bcount = 0;
148             barrierReached = 1;
149             barrier2Reached = 0;
150
151             if(strategyCreated)
152                 broadcastStrategies();
153         }
154     }
155 }
156
157 //Has finished passing the strategy list to all the processors
158 void ComlibManager::barrier2(){
159     if(CkMyPe() == 0) {
160         b2count ++;
161         ComlibPrintf("In barrier2 %d\n", bcount);
162         if(b2count == CkNumPes()) {
163             b2count = 0; 
164             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
165             cgproxy.resumeFromBarrier2();
166         }
167     }
168 }
169
170 //Registers a set of strategies with the communication library
171 ComlibInstanceHandle ComlibManager::createInstance() {
172   
173     CkpvAccess(conv_com_ptr)->nstrats++;    
174     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
175                                CkpvAccess(cmgrID));  
176     return cinst;
177 }
178
179 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
180     
181     strategyCreated = true;
182
183     ListOfStrategies.enq(strat);
184     strat->setInstance(pos);
185 }
186
187 //End of registering function, if barriers have been reached send them over
188 void ComlibManager::broadcastStrategies() {
189     if(!barrierReached)
190       return;    
191
192     lbUpdateReceived = CmiFalse;
193     barrierReached = 0;
194
195     ComlibPrintf("Sending Strategies %d, %d\n", 
196                  CkpvAccess(conv_com_ptr)->nstrats, 
197                  ListOfStrategies.length());
198
199     StrategyWrapper sw;
200     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
201
202     if(ListOfStrategies.length() > 0) {
203         int len = ListOfStrategies.length();
204         sw.s_table = new Strategy* [len];
205         sw.nstrats = len;
206         
207         for (int count=0; count < len; count++)
208             sw.s_table[count] = ListOfStrategies.deq();
209     }
210     else {
211         sw.nstrats = 0;
212         sw.s_table = 0;
213     }
214
215     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
216     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
217 }
218
219 //Called when the array/group element starts sending messages
220 void ComlibManager::beginIteration(){
221     //right now does not do anything might need later
222     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
223     //prioEndIterationFlag = 0;
224 }
225
226 void ComlibManager::setInstance(int instID){
227
228     curStratID = instID;
229     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
230 }
231
232 //called when the array elements has finished sending messages
233 void ComlibManager::endIteration(){
234     //    prioEndIterationFlag = 1;
235     prevStratID = -1;
236     
237     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
238                  curStratID, 
239                  (* strategyTable)[curStratID].elementCount, 
240                  (* strategyTable)[curStratID].numElements);
241
242     if(isRemote) {
243         isRemote = 0;
244         sendRemote();
245         return;
246     }
247
248     if(!receivedTable) {
249         (* strategyTable)[curStratID].nEndItr++;
250         return;
251     }        
252     
253     (* strategyTable)[curStratID].elementCount++;
254     int count = 0;
255     flushTable = 1;
256     
257     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
258         
259         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
260         
261         //nIterations ++;
262         /*
263         if(nIterations == LEARNING_PERIOD) {
264             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
265             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
266             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
267         }
268         */
269         
270         if(barrier2Reached) {       
271             (* strategyTable)[curStratID].strategy->doneInserting();
272         }
273         else (* strategyTable)[curStratID].call_doneInserting = 1;
274         
275         (* strategyTable)[curStratID].elementCount = 0;
276     }
277     ComlibPrintf("After EndIteration\n");
278 }
279
280 //receive the list of strategies
281 //Insert the strategies into the strategy table in converse comm lib.
282 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
283 void ComlibManager::receiveTable(StrategyWrapper &sw, 
284                                  CkHashtableT<ClibGlobalArrayIndex, int> 
285                                  &htable)
286 {
287
288     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
289                  clibIteration);
290
291     //Reset cached array element index. Location table may have changed
292     CkpvAccess(cache_index).nInts = -1;
293     CkpvAccess(cache_aid).setZero();
294
295     clibIteration ++;
296     receivedTable = 1;
297
298     delete CkpvAccess(locationTable);
299     CkpvAccess(locationTable) =  NULL;
300
301     CkpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
302
303     CkHashtableIterator *ht_iterator = htable.iterator();
304     ht_iterator->seekStart();
305     while(ht_iterator->hasNext()){
306         ClibGlobalArrayIndex *idx;
307         int *pe;
308         pe = (int *)ht_iterator->next((void **)&idx);
309         
310         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
311                      idx->idx.data()[0], *pe);
312
313         CkpvAccess(locationTable)->put(*idx) = *pe;       
314     }
315
316     CkArrayID st_aid;
317     int st_nelements;
318     CkArrayIndexMax *st_elem;
319     int temp_curstratid = curStratID;
320
321     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
322     clib_stats.setNstrats(sw.total_nstrats);
323
324     //First recreate strategies
325     int count = 0;
326     for(count = 0; count < sw.nstrats; count ++) {
327         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
328         
329         //set the instance to the current count
330         //currently all strategies are being copied to all processors
331         //later strategies will be selectively copied
332         
333         //location of this strategy table entry in the strategy table
334         int loc = cur_strategy->getInstance();
335         
336         if(loc >= MAX_NUM_STRATS)
337             CkAbort("Strategy table is full \n");
338
339         CharmStrategy *old_strategy;
340
341         //If this is a learning decision and the old strategy has to
342         //be gotten rid of, finalize it here.
343         if((old_strategy = 
344             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
345            != NULL) {
346             old_strategy->finalizeProcessing();
347
348             //Unregister from array listener if array strategy
349             if(old_strategy->getType() == ARRAY_STRATEGY) {
350                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
351                 as.getSourceArray(st_aid, st_elem, st_nelements);
352
353                 (* strategyTable)[loc].numElements = 0;
354                 if(!st_aid.isZero()) {
355                     ComlibArrayListener *calistener = CkArrayID::
356                         CkLocalBranch(st_aid)->getComlibArrayListener();
357                     
358                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
359                 }
360             }
361         }
362         
363         //Insert strategy, frees an old strategy and sets the
364         //strategy_table entry to point to the new one
365         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
366         
367         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
368
369         if(cur_strategy->getType() == ARRAY_STRATEGY &&
370            cur_strategy->isBracketed()){ 
371
372             ComlibPrintf("Inserting Array Listener\n");
373
374             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
375             as.getSourceArray(st_aid, st_elem, st_nelements);
376             
377             (* strategyTable)[loc].numElements = 0;
378             if(!st_aid.isZero()) {            
379                 ComlibArrayListener *calistener = 
380                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
381                 
382                 calistener->registerStrategy(&((* strategyTable)[loc]));
383             }
384         }                      
385         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
386           (* strategyTable)[loc].numElements = 1;
387         }
388         
389         (* strategyTable)[loc].elementCount = 0;
390         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
391     }
392
393     //Resume all end iterarions. Newer strategies may have more 
394     //or fewer elements to expect for!!
395     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
396         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
397                      CkMyPe(), count,
398                      (* strategyTable)[count].nEndItr);
399                          
400         curStratID = count;
401         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
402             endIteration();  
403         
404         (* strategyTable)[count].nEndItr = 0;        
405     }           
406     
407     curStratID = temp_curstratid;
408     ComlibPrintf("receivedTable %d\n", sw.nstrats);
409     
410     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
411     cgproxy[curComlibController].barrier2();
412 }
413
414 void ComlibManager::resumeFromBarrier2(){
415     barrier2Reached = 1;
416     barrierReached = 0;
417
418     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
419
420     //    if(flushTable) {
421     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
422         if (!(* strategyTable)[count].tmplist.isEmpty()) {
423             CharmMessageHolder *cptr;
424             while (!(* strategyTable)[count].tmplist.isEmpty()) {
425                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
426                     (* strategyTable)[count].tmplist.deq();
427                 
428                 if((*strategyTable)[count].strategy->getType() == 
429                    ARRAY_STRATEGY) {
430                     if(cmsg->dest_proc >= 0) {
431                         envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
432                         cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
433                                                        env->getsetArrayIndex());
434                     }
435                     //else
436                     //  CkAbort("NOT FIXED YET\n");                    
437                 }                                
438                 (* strategyTable)[count].strategy->insertMessage(cmsg);
439             }
440         }
441         
442         if ((* strategyTable)[count].call_doneInserting) {
443             (* strategyTable)[count].call_doneInserting = 0;
444             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
445             (* strategyTable)[count].strategy->doneInserting();
446         }
447     }
448     //}    
449     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
450 }
451
452 extern int _charmHandlerIdx;
453
454 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
455                               const CkArrayIndexMax &idx, CkArrayID a){
456     
457     if(pd != NULL) {
458         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
459         setInstance(ci->_instid);
460     }
461     
462     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
463     
464     CkArrayIndexMax myidx = idx;
465     int dest_proc = getLastKnown(a, myidx); 
466     int amgr_destpe = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
467     
468     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
469     //   UsrToEnv(msg)->getTotalsize(), receivedTable);
470
471     register envelope * env = UsrToEnv(msg);
472     
473     env->getsetArrayMgr()=a;
474     env->getsetArraySrcPe()=CkMyPe();
475     env->getsetArrayEp()=ep;
476     env->getsetArrayHops()=0;
477     env->getsetArrayIndex()=idx;
478     env->setUsed(0);
479     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
480
481     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
482
483     CkPackMessage(&env);
484     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
485     
486     if(isRemote) {
487         CharmMessageHolder *cmsg = new 
488             CharmMessageHolder((char *)msg, dest_proc);
489
490         remoteQ.enq(cmsg);
491         return;
492     }
493     
494     //Any bug here? FOO BAR??
495     if(amgr_destpe == CkMyPe()){
496         CProxyElement_ArrayBase ap(a,idx);
497         ap.ckSend((CkArrayMessage *)msg, ep);
498         return;
499     }
500     
501     //totalMsgCount ++;
502     //totalBytes += UsrToEnv(msg)->getTotalsize();
503
504     CharmMessageHolder *cmsg = new 
505         CharmMessageHolder((char *)msg, dest_proc);
506     //get rid of the new.
507
508     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
509
510     if (receivedTable)
511       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
512     else {
513         flushTable = 1;
514         (* strategyTable)[curStratID].tmplist.enq(cmsg);
515     }
516
517     //CmiPrintf("After Insert\n");
518 }
519
520
521 #include "qd.h"
522 //CkpvExtern(QdState*, _qd);
523
524 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
525     
526
527     if(pd != NULL) {
528         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
529         setInstance(ci->_instid);
530     }
531
532     int dest_proc = onPE;
533     /*
534     if(curStratID != prevStratID && prioEndIterationFlag) {        
535         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
536         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
537                      CkMyPe());
538         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg;
539         *(int *)CkPriorityPtr(pmsg) = -0x7FFFFFFF;
540         CkSetQueueing(pmsg, CK_QUEUEING_IFIFO);
541         cgproxy[CkMyPe()].prioEndIteration(pmsg);
542         prioEndIterationFlag = 0;
543     }        
544     prevStratID = curStratID;            
545     */
546
547     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
548                  UsrToEnv(msg)->getTotalsize());
549
550     register envelope * env = UsrToEnv(msg);
551     if(dest_proc == CkMyPe()){
552         _SET_USED(env, 0);
553         CkSendMsgBranch(ep, msg, dest_proc, gid);
554         return;
555     }
556     
557     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
558     CpvAccess(_qd)->create(1);
559
560     env->setMsgtype(ForBocMsg);
561     env->setEpIdx(ep);
562     env->setGroupNum(gid);
563     env->setSrcPe(CkMyPe());
564     env->setUsed(0);
565
566     CkPackMessage(&env);
567     CmiSetHandler(env, _charmHandlerIdx);
568
569     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
570     //get rid of the new.
571     
572     if(receivedTable)
573         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
574     else {
575         flushTable = 1;
576         (* strategyTable)[curStratID].tmplist.enq(cmsg);
577     }
578 }
579
580 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
581     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
582
583     if(pd != NULL) {
584         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
585         setInstance(ci->_instid);
586     }
587     
588     //Broken, add the processor list here.
589
590     register envelope * env = UsrToEnv(m);
591     env->getsetArrayMgr()=a;
592     env->getsetArraySrcPe()=CkMyPe();
593     env->getsetArrayEp()=ep;
594     env->getsetArrayHops()=0;
595     env->getsetArrayIndex()= dummyArrayIndex;
596     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
597
598     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
599     
600     /*  //section multicast header not needed for broadcast
601       CkSectionInfo minfo;
602       minfo.type = COMLIB_MULTICAST_MESSAGE;
603       minfo.sInfo.cInfo.instId = curStratID;
604       minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
605       minfo.sInfo.cInfo.id = 0; 
606       minfo.pe = CkMyPe();
607       ((CkMcastBaseMsg *)m)->_cookie = minfo;       
608     */
609
610     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
611
612     CharmMessageHolder *cmsg = new 
613         CharmMessageHolder((char *)m, IS_BROADCAST);
614     cmsg->npes = 0;
615     cmsg->pelist = NULL;
616     cmsg->sec_id = NULL;
617
618     multicast(cmsg);
619 }
620
621 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
622                                      CkArrayID a, CkSectionID &s, int opts) {
623
624 #ifndef CMK_OPTIMIZE
625     traceUserEvent(section_send_event);
626 #endif
627
628     if(pd != NULL) {
629         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
630         setInstance(ci->_instid);
631     }
632     
633     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
634
635     register envelope * env = UsrToEnv(m);
636     env->getsetArrayMgr()=a;
637     env->getsetArraySrcPe()=CkMyPe();
638     env->getsetArrayEp()=ep;
639     env->getsetArrayHops()=0;
640     env->getsetArrayIndex()= dummyArrayIndex;
641     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
642
643     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
644     
645     env->setUsed(0);    
646     CkPackMessage(&env);
647     
648     //totalMsgCount ++;
649     //totalBytes += env->getTotalsize();
650
651     //Provide a dummy dest proc as it does not matter for mulitcast 
652     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
653                                                       IS_SECTION_MULTICAST);
654     cmsg->npes = 0;
655     cmsg->sec_id = &s;
656
657     CkSectionInfo minfo;
658     minfo.type = COMLIB_MULTICAST_MESSAGE;
659     minfo.sInfo.cInfo.instId = curStratID;
660     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
661     minfo.sInfo.cInfo.id = 0; 
662     minfo.pe = CkMyPe();
663     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
664     //    s.npes = 0;
665     //s.pelist = NULL;
666
667     multicast(cmsg);
668 }
669
670 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
671     
672     if(pd != NULL) {
673         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
674         setInstance(ci->_instid);
675     }
676     
677     register envelope * env = UsrToEnv(m);
678     
679     CpvAccess(_qd)->create(1);
680
681     env->setMsgtype(ForBocMsg);
682     env->setEpIdx(ep);
683     env->setGroupNum(g);
684     env->setSrcPe(CkMyPe());
685     env->setUsed(0);
686     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
687
688     CkPackMessage(&env);
689     CmiSetHandler(env, _charmHandlerIdx);
690     
691     //Provide a dummy dest proc as it does not matter for mulitcast 
692     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
693     
694     cmsg->npes = 0;
695     cmsg->pelist = NULL;
696
697     multicast(cmsg);
698 }
699
700 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
701
702     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
703     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
704
705     env->setUsed(0);    
706     CkPackMessage(&env);
707
708     //Will be used to detect multicast message for learning
709     //totalMsgCount ++;
710     //totalBytes += env->getTotalsize();
711     
712     if (receivedTable)
713         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
714     else {
715         flushTable = 1;
716         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
717         (* strategyTable)[curStratID].tmplist.enq(cmsg);
718     }
719
720     ComlibPrintf("After multicast\n");
721 }
722
723 //Collect statistics from all the processors, also gets the list of
724 //array elements on each processor.
725 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
726                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
727     
728     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
729
730     numStatsReceived ++;
731     clib_gstats.updateStats(stat, pe);
732     
733     for(int count = 0; count < idx_vec.length(); count++) {
734         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
735         
736         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
737                      pe);
738         
739         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
740     }        
741
742     if(numStatsReceived == CkNumPes()) {
743         numStatsReceived = 0;
744
745         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
746             count++ ){
747             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
748             if(strat->getType() > CONVERSE_STRATEGY) {
749                 CharmStrategy *cstrat = (CharmStrategy *)strat;
750                 ComlibLearner *learner = cstrat->getLearner();
751                 CharmStrategy *newstrat = NULL;
752                                 
753                 if(learner != NULL) {
754                     ComlibPrintf("Calling Learner\n");
755                     newstrat = (CharmStrategy *)learner->optimizePattern(strat, clib_gstats);
756                     if(newstrat != NULL)
757                         ListOfStrategies.enq(newstrat);
758                 }
759             }
760         }
761         barrierReached = 1;
762         
763         //if(lbUpdateReceived) {
764         //lbUpdateReceived = CmiFalse;
765         broadcastStrategies();
766         //}
767     }
768 }
769
770 void ComlibManager::setRemote(int remote_pe){
771
772     ComlibPrintf("Setting remote flag on\n");
773
774     remotePe = remote_pe;
775     isRemote = 1;
776 }
777
778
779 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
780                                       int strat_id) {
781     setInstance(strat_id);
782     
783     int nmsgs = rq.length();
784
785     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
786
787     for(int count = 0; count < nmsgs; count++) {
788         char *msg = rq.deq()->getCharmMessage();
789         envelope *env = UsrToEnv(msg);
790         
791         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
792                   env->getsetArrayMgr());
793     }
794
795     endIteration();
796 }
797
798 void ComlibManager::sendRemote(){
799     
800     int nmsgs = remoteQ.length();
801
802     //if(nmsgs == 0)
803     //  return;
804
805     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
806
807     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
808     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
809     
810     for(int count = 0; count < nmsgs; count++) {
811         CharmMessageHolder *cmsg = remoteQ.deq();
812         CkFreeMsg(cmsg->getCharmMessage());
813         delete cmsg;
814     }
815 }
816
817
818 void ComlibManager::AtSync() {
819
820     //comm_debug = 1;
821     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
822
823     barrier2Reached = 0;
824     receivedTable = 0;
825     barrierReached = 0;
826     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
827
828     int pos = 0;
829
830     CkVec<ClibGlobalArrayIndex> gidx_vec;
831
832     CkVec<CkArrayID> tmp_vec;
833     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
834         if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
835             CharmStrategy *cstrat = (CharmStrategy*)
836                 ((* strategyTable)[count].strategy);
837             
838             CkArrayID src, dest;
839             CkArrayIndexMax *elements;
840             int nelem;
841             
842             cstrat->ainfo.getSourceArray(src, elements, nelem);
843             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
844
845             CmiBool srcflag = CmiFalse;
846             CmiBool destflag = CmiFalse;
847             
848             if(src == dest || dest.isZero())
849                 destflag = CmiTrue;
850
851             if(src.isZero())
852                 srcflag = CmiTrue;                        
853
854             for(pos = 0; pos < tmp_vec.size(); pos++) {
855                 if(tmp_vec[pos] == src)
856                     srcflag = CmiTrue;
857
858                 if(tmp_vec[pos] == dest)
859                     destflag = CmiTrue;
860
861                 if(srcflag && destflag)
862                     break;
863             }
864
865             if(!srcflag)
866                 tmp_vec.insertAtEnd(src);
867
868             if(!destflag)
869                 tmp_vec.insertAtEnd(dest);
870         }
871         
872         //cant do it here, done in receiveTable
873         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
874         //  (* strategyTable)[count].reset();
875     }
876
877     for(pos = 0; pos < tmp_vec.size(); pos++) {
878         CkArrayID aid = tmp_vec[pos];
879
880         ComlibArrayListener *calistener = 
881             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
882
883         CkVec<CkArrayIndexMax> idx_vec;
884         calistener->getLocalIndices(idx_vec);
885
886         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
887             ClibGlobalArrayIndex gindex;
888             gindex.aid = aid;
889             gindex.idx = idx_vec[idx_count];
890
891             gidx_vec.insertAtEnd(gindex);
892         }
893     }
894
895     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
896     clib_stats.reset();
897 }
898
899 #include "lbdb.h"
900 #include "CentralLB.h"
901
902 /******** FOO BAR : NEEDS to be consistent with array manager *******/
903 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
904     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
905         CkAbort("LDB OBJ ID smaller than array index\n");
906     
907     //values higher than CkArrayIndexMax should be 0
908     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
909         idx.data()[count] = ld_id.id[count];
910     }
911     idx.nInts = 1;
912 }
913
914 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
915     for(int count = 0; count < msg->n_moves; count ++) {
916         MigrateInfo m = msg->moves[count];
917
918         CkArrayID aid; CkArrayIndexMax idx;
919         aid = CkArrayID(m.obj.omhandle.id.id);
920         LDObjID2IdxMax(m.obj.id, idx);
921
922         ClibGlobalArrayIndex cid; 
923         cid.aid = aid;
924         cid.idx = idx;
925         
926         int pe = CkpvAccess(locationTable)->get(cid);
927
928         //Value exists in the table, so update it
929         if(pe != 0) {
930             pe = m.to_pe + CkNumPes();
931             CkpvAccess(locationTable)->getRef(cid) = pe;
932         }
933         //otherwise we dont care about these objects
934     }   
935
936     lbUpdateReceived = CmiTrue;
937     if(barrierReached) {
938         broadcastStrategies();
939         barrierReached = 0;
940     }
941
942     CkFreeMsg(msg);
943 }
944
945 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
946     ComlibInstanceHandle *inst = new ComlibInstanceHandle
947         (*((ComlibInstanceHandle *)data));
948     return inst;
949 }
950
951
952 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
953                                                    CkDelegateData *pd) {
954
955     if(pd == NULL)
956         return NULL;
957
958     CmiBool to_pup = CmiFalse;
959
960     ComlibInstanceHandle *inst; 
961     if(!p.isUnpacking()) {
962         inst = (ComlibInstanceHandle *) pd;       
963         if(pd != NULL)
964             to_pup = CmiTrue;
965     }
966     else 
967         //Call migrate constructor
968         inst = new ComlibInstanceHandle();
969
970     p | to_pup;
971
972     if(to_pup)
973         inst->pup(p);
974     return inst;
975 }    
976
977
978 void ComlibDelegateProxy(CProxy *proxy){
979     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
980     proxy->ckDelegate(cgproxy.ckLocalBranch(), NULL);
981 }
982
983 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
984     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
985         (CkGetComlibInstance());
986
987     cinst->setStrategy(strat);
988     
989     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
990     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
991
992
993 void ComlibBegin(CProxy &proxy) {
994     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
995     cinst->beginIteration();
996
997
998 void ComlibEnd(CProxy &proxy) {
999     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
1000     cinst->endIteration();
1001 }
1002
1003 ComlibInstanceHandle CkCreateComlibInstance(){
1004     return CkGetComlibInstance();
1005 }
1006
1007 ComlibInstanceHandle CkGetComlibInstance() {
1008     if(CkMyPe() != 0)
1009         CkAbort("Comlib Instance can only be created on Processor 0");
1010     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
1011     return (cgproxy.ckLocalBranch())->createInstance();
1012 }
1013
1014 ComlibInstanceHandle CkGetComlibInstance(int id) {
1015     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
1016     return cinst;
1017 }
1018
1019 void ComlibDoneCreating(){
1020     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1021     (cgproxy.ckLocalBranch())->broadcastStrategies();
1022 }
1023
1024 char *router;
1025 int sfactor=0;
1026
1027 class ComlibManagerMain {
1028 public:
1029     ComlibManagerMain(CkArgMsg *msg) {
1030         
1031         if(CkMyPe() == 0 && msg !=  NULL)
1032             CmiGetArgString(msg->argv, "+strategy", &router);         
1033
1034         if(CkMyPe() == 0 && msg !=  NULL)
1035             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1036         
1037         CProxy_ComlibManager::ckNew();
1038     }
1039 };
1040
1041 //Called by user code
1042 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1043     _instid = -1;
1044     _dmid.setZero();
1045     _srcPe = -1;
1046     toForward = 0;
1047 }
1048
1049 //Called by user code
1050 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1051     : CkDelegateData() {
1052     _instid = h._instid;
1053     _dmid = h._dmid;
1054     toForward = h.toForward;        
1055
1056     ComlibPrintf("In Copy Constructor\n");
1057     _srcPe = h._srcPe;
1058
1059     reset();
1060     ref();
1061 }
1062
1063 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1064     _instid = h._instid;
1065     _dmid = h._dmid;
1066     toForward = h.toForward;
1067     _srcPe = h._srcPe;
1068     
1069     reset();
1070     ref();
1071     return *this;
1072 }
1073
1074 //Called by the communication library
1075 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1076     _instid = instid;
1077     _dmid   = dmid;
1078     _srcPe  = -1;
1079     toForward = 0;
1080 }
1081
1082 void ComlibInstanceHandle::beginIteration() { 
1083     CProxy_ComlibManager cgproxy(_dmid);
1084
1085     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1086
1087     //User forgot to make the instance handle a readonly or pass it
1088     //into the constructor of an array and is using it directly from
1089     //Main :: main
1090     if(_srcPe == -1) {
1091         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1092         _srcPe = CkMyPe();
1093     }
1094
1095     if(_srcPe != CkMyPe() && toForward) {
1096         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1097     }
1098
1099     (cgproxy.ckLocalBranch())->setInstance(_instid);
1100     (cgproxy.ckLocalBranch())->beginIteration();   
1101 }
1102
1103 void ComlibInstanceHandle::endIteration() {
1104     CProxy_ComlibManager cgproxy(_dmid);
1105     (cgproxy.ckLocalBranch())->endIteration();
1106 }
1107
1108 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1109     toForward = s->getForwardOnMigration();
1110     CProxy_ComlibManager cgproxy(_dmid);
1111     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1112 }
1113
1114 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1115     if(_instid < 0) 
1116         return NULL;    
1117     
1118     CProxy_ComlibManager cgproxy(_dmid);
1119     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1120 }
1121
1122 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1123
1124 void ComlibInitSectionID(CkSectionID &sid){
1125     
1126     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1127     sid._cookie.pe = CkMyPe();
1128     
1129     sid._cookie.sInfo.cInfo.id = 0;    
1130     sid.npes = 0;
1131     sid.pelist = NULL;
1132 }
1133
1134 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1135     CkSectionID &sid = sproxy->ckGetSectionID();
1136     ComlibInitSectionID(sid);
1137     sid._cookie.sInfo.cInfo.status = 0;
1138 }
1139
1140 // for backward compatibility - for old name commlib
1141 void _registercommlib(void)
1142 {
1143   static int _done = 0; 
1144   if(_done) 
1145       return; 
1146   _done = 1;
1147   _registercomlib();
1148 }
1149
1150 void ComlibAtSyncHandler(void *msg) {
1151     CmiFree(msg);
1152     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1153     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1154     if(cmgr_ptr)
1155         cmgr_ptr->AtSync();    
1156 }
1157
1158 void ComlibNotifyMigrationDoneHandler(void *msg) {
1159     CmiFree(msg);
1160     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1161     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1162     if(cmgr_ptr)
1163         cmgr_ptr->AtSync();    
1164 }
1165
1166
1167 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1168     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1169     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1170 }
1171
1172 #include "comlib.def.h"
1173
1174