0d734c663b89878052f5503a8209863b388c33f0
[charm.git] / src / ck-com / ComlibManager.C
1
2 #include "ComlibManager.h"
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_com_ptr);
22
23 //Handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
27
28     if(msg == NULL)
29         return;
30     
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     int srcPe = env->getSrcPe();
37     int sid = ((CmiMsgHeaderExt *) env)->stratid;
38
39     ComlibPrintf("%d: Recording receive %d, %d, %d\n", CkMyPe(), 
40              sid, env->getTotalsize(), srcPe);
41
42     RECORD_RECV_STATS(sid, env->getTotalsize(), srcPe);
43     
44     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
45     //if(!comm_debug)
46     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
47
48     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
49     return;
50 }
51
52 void recv_combined_array_msg(void *msg){
53     if(msg == NULL)
54         return;
55     
56     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
57
58     MsgPacker::deliver((CombinedMessage *)msg);
59 }
60
61 ComlibManager::ComlibManager(){
62     init();
63     ComlibPrintf("In comlibmanager constructor\n");
64 }
65
66 void ComlibManager::init(){
67     
68     initComlibManager();
69
70     PUPable_reg(CharmStrategy);
71     PUPable_reg(CharmMessageHolder);
72     
73     //comm_debug = 1;
74     
75     numStatsReceived = 0;
76     curComlibController = 0;
77     clibIteration = 0;
78     
79     strategyCreated = CmiFalse;
80
81     CkpvInitialize(ClibLocationTableType*, locationTable);
82     CkpvAccess(locationTable) = new CkHashtableT <ClibGlobalArrayIndex, int>;
83
84     CkpvInitialize(CkArrayIndexMax, cache_index);
85     CkpvInitialize(int, cache_pe);
86     CkpvInitialize(CkArrayID, cache_aid);
87
88     CkpvAccess(cache_index).nInts = -1;
89     CkpvAccess(cache_aid).setZero();
90
91     CkpvInitialize(int, RecvmsgHandle);
92     CkpvAccess(RecvmsgHandle) =CkRegisterHandler((CmiHandler)recv_array_msg);
93
94     bcast_pelist = new int [CkNumPes()];
95     for(int brcount = 0; brcount < CkNumPes(); brcount++)
96         bcast_pelist[brcount] = brcount;
97
98     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
99     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
100         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
101     
102     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
103     
104     npes = CkNumPes();
105     pelist = NULL;
106
107     CkpvInitialize(CkGroupID, cmgrID);
108     CkpvAccess(cmgrID) = thisgroup;
109
110     dummyArrayIndex.nInts = 0;
111
112     curStratID = 0;
113     prevStratID = -1;
114     //prioEndIterationFlag = 1;
115
116     strategyTable = CkpvAccess(conv_com_ptr)->getStrategyTable();
117     
118     receivedTable = 0;
119     flushTable = 0;
120
121     barrierReached = 0;
122     barrier2Reached = 0;
123
124     bcount = b2count = 0;
125     lbUpdateReceived = CmiFalse;
126
127     isRemote = 0;
128     remotePe = -1;
129
130     CkpvInitialize(int, migrationDoneHandlerID);
131     CkpvAccess(migrationDoneHandlerID) = 
132         CkRegisterHandler((CmiHandler) ComlibNotifyMigrationDoneHandler);
133     
134     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
135     cgproxy[curComlibController].barrier();
136 }
137
138 //First barrier makes sure that the communication library group 
139 //has been created on all processors
140 void ComlibManager::barrier(){
141     ComlibPrintf("In barrier %d\n", bcount);
142     if(CkMyPe() == 0) {
143         bcount ++;
144         if(bcount == CkNumPes()){
145             bcount = 0;
146             barrierReached = 1;
147             barrier2Reached = 0;
148
149             if(strategyCreated)
150                 broadcastStrategies();
151         }
152     }
153 }
154
155 //Has finished passing the strategy list to all the processors
156 void ComlibManager::barrier2(){
157     if(CkMyPe() == 0) {
158         b2count ++;
159         ComlibPrintf("In barrier2 %d\n", bcount);
160         if(b2count == CkNumPes()) {
161             b2count = 0; 
162             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
163             cgproxy.resumeFromBarrier2();
164         }
165     }
166 }
167
168 //Registers a set of strategies with the communication library
169 ComlibInstanceHandle ComlibManager::createInstance() {
170   
171     CkpvAccess(conv_com_ptr)->nstrats++;    
172     ComlibInstanceHandle cinst(CkpvAccess(conv_com_ptr)->nstrats -1,
173                                CkpvAccess(cmgrID));  
174     return cinst;
175 }
176
177 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
178     
179     strategyCreated = true;
180
181     ListOfStrategies.enq(strat);
182     strat->setInstance(pos);
183 }
184
185 //End of registering function, if barriers have been reached send them over
186 void ComlibManager::broadcastStrategies() {
187     if(!barrierReached)
188       return;    
189
190     lbUpdateReceived = CmiFalse;
191     barrierReached = 0;
192
193     ComlibPrintf("Sending Strategies %d, %d\n", 
194                  CkpvAccess(conv_com_ptr)->nstrats, 
195                  ListOfStrategies.length());
196
197     StrategyWrapper sw;
198     sw.total_nstrats = CkpvAccess(conv_com_ptr)->nstrats;
199
200     if(ListOfStrategies.length() > 0) {
201         int len = ListOfStrategies.length();
202         sw.s_table = new Strategy* [len];
203         sw.nstrats = len;
204         
205         for (int count=0; count < len; count++)
206             sw.s_table[count] = ListOfStrategies.deq();
207     }
208     else {
209         sw.nstrats = 0;
210         sw.s_table = 0;
211     }
212
213     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
214     cgproxy.receiveTable(sw, *CkpvAccess(locationTable));
215 }
216
217 //Called when the array/group element starts sending messages
218 void ComlibManager::beginIteration(){
219     //right now does not do anything might need later
220     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
221     //prioEndIterationFlag = 0;
222 }
223
224 void ComlibManager::setInstance(int instID){
225
226     curStratID = instID;
227     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
228 }
229
230 //called when the array elements has finished sending messages
231 void ComlibManager::endIteration(){
232     //    prioEndIterationFlag = 1;
233     prevStratID = -1;
234     
235     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), 
236                  curStratID, 
237                  (* strategyTable)[curStratID].elementCount, 
238                  (* strategyTable)[curStratID].numElements);
239
240     if(isRemote) {
241         isRemote = 0;
242         sendRemote();
243         return;
244     }
245
246     if(!receivedTable) {
247         (* strategyTable)[curStratID].nEndItr++;
248         return;
249     }        
250     
251     (* strategyTable)[curStratID].elementCount++;
252     int count = 0;
253     flushTable = 1;
254     
255     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
256         
257         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
258         
259         if(barrier2Reached) {       
260             (* strategyTable)[curStratID].strategy->doneInserting();
261         }
262         else (* strategyTable)[curStratID].call_doneInserting = 1;
263         
264         (* strategyTable)[curStratID].elementCount = 0;
265     }
266     ComlibPrintf("After EndIteration\n");
267 }
268
269 //receive the list of strategies
270 //Insert the strategies into the strategy table in converse comm lib.
271 //CkpvAccess(conv_com_ptr) points to the converse commlib instance
272 void ComlibManager::receiveTable(StrategyWrapper &sw, 
273                                  CkHashtableT<ClibGlobalArrayIndex, int> 
274                                  &htable)
275 {
276
277     ComlibPrintf("[%d] In receiveTable %d, ite=%d\n", CkMyPe(), sw.nstrats, 
278                  clibIteration);
279
280     clibIteration ++;
281     receivedTable = 1;
282
283     delete CkpvAccess(locationTable);
284     CkpvAccess(locationTable) =  NULL;
285
286     CkpvAccess(locationTable) = new CkHashtableT<ClibGlobalArrayIndex, int>;
287
288     CkHashtableIterator *ht_iterator = htable.iterator();
289     ht_iterator->seekStart();
290     while(ht_iterator->hasNext()){
291         ClibGlobalArrayIndex *idx;
292         int *pe;
293         pe = (int *)ht_iterator->next((void **)&idx);
294         
295         ComlibPrintf("[%d] HASH idx %d on %d\n", CkMyPe(), 
296                      idx->idx.data()[0], *pe);
297
298         CkpvAccess(locationTable)->put(*idx) = *pe;       
299     }
300     
301     //Reset cached array element index. Location table may have changed
302     CkpvAccess(cache_index).nInts = -1;
303     CkpvAccess(cache_aid).setZero();
304
305     CkArrayID st_aid;
306     int st_nelements;
307     CkArrayIndexMax *st_elem;
308     int temp_curstratid = curStratID;
309
310     CkpvAccess(conv_com_ptr)->nstrats = sw.total_nstrats;
311     clib_stats.setNstrats(sw.total_nstrats);
312
313     //First recreate strategies
314     int count = 0;
315     for(count = 0; count < sw.nstrats; count ++) {
316         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
317         
318         //set the instance to the current count
319         //currently all strategies are being copied to all processors
320         //later strategies will be selectively copied
321         
322         //location of this strategy table entry in the strategy table
323         int loc = cur_strategy->getInstance();
324         
325         if(loc >= MAX_NUM_STRATS)
326             CkAbort("Strategy table is full \n");
327
328         CharmStrategy *old_strategy;
329
330         //If this is a learning decision and the old strategy has to
331         //be gotten rid of, finalize it here.
332         if((old_strategy = 
333             (CharmStrategy *)CkpvAccess(conv_com_ptr)->getStrategy(loc)) 
334            != NULL) {
335             old_strategy->finalizeProcessing();
336
337             //Unregister from array listener if array strategy
338             if(old_strategy->getType() == ARRAY_STRATEGY) {
339                 ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
340                 as.getSourceArray(st_aid, st_elem, st_nelements);
341
342                 (* strategyTable)[loc].numElements = 0;
343                 if(!st_aid.isZero()) {
344                     ComlibArrayListener *calistener = CkArrayID::
345                         CkLocalBranch(st_aid)->getComlibArrayListener();
346                     
347                     calistener->unregisterStrategy(&((*strategyTable)[loc]));
348                 }
349             }
350         }
351         
352         //Insert strategy, frees an old strategy and sets the
353         //strategy_table entry to point to the new one
354         CkpvAccess(conv_com_ptr)->insertStrategy(cur_strategy, loc);
355         
356         ComlibPrintf("[%d] Inserting_strategy \n", CkMyPe());       
357
358         if(cur_strategy->getType() == ARRAY_STRATEGY &&
359            cur_strategy->isBracketed()){ 
360
361             ComlibPrintf("Inserting Array Listener\n");
362
363             ComlibArrayInfo &as = ((CharmStrategy *)cur_strategy)->ainfo;
364             as.getSourceArray(st_aid, st_elem, st_nelements);
365             
366             (* strategyTable)[loc].numElements = 0;
367             if(!st_aid.isZero()) {            
368                 ComlibArrayListener *calistener = 
369                     CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
370                 
371                 calistener->registerStrategy(&((* strategyTable)[loc]));
372             }
373         }                      
374         else { //if(cur_strategy->getType() == GROUP_STRATEGY){
375           (* strategyTable)[loc].numElements = 1;
376         }
377         
378         (* strategyTable)[loc].elementCount = 0;
379         cur_strategy->beginProcessing((* strategyTable)[loc].numElements); 
380     }
381
382     //Resume all end iterarions. Newer strategies may have more 
383     //or fewer elements to expect for!!
384     for(count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count++) {
385         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
386                      CkMyPe(), count,
387                      (* strategyTable)[count].nEndItr);
388                          
389         curStratID = count;
390         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
391             endIteration();  
392         
393         (* strategyTable)[count].nEndItr = 0;        
394     }           
395     
396     curStratID = temp_curstratid;
397     ComlibPrintf("receivedTable %d\n", sw.nstrats);
398     
399     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
400     cgproxy[curComlibController].barrier2();
401 }
402
403 void ComlibManager::resumeFromBarrier2(){
404
405     barrier2Reached = 1;
406     barrierReached = 0;
407
408     ComlibPrintf("[%d] Barrier 2 reached nstrats = %d, ite = %d\n", CkMyPe(), CkpvAccess(conv_com_ptr)->nstrats, clibIteration);
409
410     //    if(flushTable) {
411     for (int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
412         if (!(* strategyTable)[count].tmplist.isEmpty()) {
413             CharmMessageHolder *cptr;
414             while (!(* strategyTable)[count].tmplist.isEmpty()) {
415                 CharmMessageHolder *cmsg = (CharmMessageHolder *) 
416                     (* strategyTable)[count].tmplist.deq();
417                 
418                 if((*strategyTable)[count].strategy->getType() == 
419                    ARRAY_STRATEGY) {
420                     if(cmsg->dest_proc >= 0) {
421                         envelope *env  = UsrToEnv(cmsg->getCharmMessage()); 
422                         cmsg->dest_proc = getLastKnown(env->getsetArrayMgr(), 
423                                                        env->getsetArrayIndex());
424                     }
425                     //else multicast
426                 }                                
427
428                 if(cmsg->dest_proc == CkMyPe()) {
429                     CmiSyncSendAndFree(CkMyPe(), cmsg->size,
430                                        (char *)
431                                        UsrToEnv(cmsg->getCharmMessage()));
432                     delete cmsg;
433                 }
434                 else
435                     (* strategyTable)[count].strategy->insertMessage(cmsg);
436             }
437         }
438         
439         if ((* strategyTable)[count].call_doneInserting) {
440             (* strategyTable)[count].call_doneInserting = 0;
441             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
442             (* strategyTable)[count].strategy->doneInserting();
443         }
444     }
445     //}    
446     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
447 }
448
449 extern int _charmHandlerIdx;
450
451 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
452                               const CkArrayIndexMax &idx, CkArrayID a){
453     
454     if(pd != NULL) {
455         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
456         setInstance(ci->_instid);
457     }
458     
459     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
460     
461     CkArrayIndexMax myidx = idx;
462     int dest_proc = getLastKnown(a, myidx); 
463     
464     //Reading from two hash tables is a big overhead
465     //int amgr_destpe = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
466     
467     register envelope * env = UsrToEnv(msg);
468     
469     env->getsetArrayMgr()=a;
470     env->getsetArraySrcPe()=CkMyPe();
471     env->getsetArrayEp()=ep;
472     env->getsetArrayHops()=0;
473     env->getsetArrayIndex()=idx;
474     env->setUsed(0);
475     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
476
477     //RECORD_SEND_STATS(curStratID, env->getTotalsize(), dest_proc);
478
479     if(isRemote) {
480         CkPackMessage(&env);        
481         CharmMessageHolder *cmsg = new 
482             CharmMessageHolder((char *)msg, dest_proc);
483         
484         remoteQ.enq(cmsg);
485         return;
486     }
487     
488     //With migration some array messages may be directly sent Also no
489     //message processing should happen before the comlib barriers have
490     //gone through
491     if(dest_proc == CkMyPe() && receivedTable){                
492         CkArray *amgr = (CkArray *)_localBranch(a);
493         amgr->deliver((CkArrayMessage *)msg, CkDeliver_queue);
494         
495         return;
496     }
497
498     CkPackMessage(&env);
499     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));        
500     
501     //get rid of the new.
502     CharmMessageHolder *cmsg = new 
503         CharmMessageHolder((char *)msg, dest_proc);
504     
505     ComlibPrintf("[%d] Before Insert on strat %d received = %d\n", CkMyPe(), curStratID, receivedTable);
506     
507     if (receivedTable)        
508         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
509     else {
510         flushTable = 1;
511         (* strategyTable)[curStratID].tmplist.enq(cmsg);
512     }
513     
514     //CmiPrintf("After Insert\n");
515 }
516
517
518 #include "qd.h"
519 //CkpvExtern(QdState*, _qd);
520
521 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
522     
523
524     if(pd != NULL) {
525         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
526         setInstance(ci->_instid);
527     }
528
529     int dest_proc = onPE;
530
531     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
532                  UsrToEnv(msg)->getTotalsize());
533
534     register envelope * env = UsrToEnv(msg);
535     if(dest_proc == CkMyPe() && receivedTable){
536         _SET_USED(env, 0);
537         CkSendMsgBranch(ep, msg, dest_proc, gid);
538         return;
539     }
540     
541     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
542     CpvAccess(_qd)->create(1);
543
544     env->setMsgtype(ForBocMsg);
545     env->setEpIdx(ep);
546     env->setGroupNum(gid);
547     env->setSrcPe(CkMyPe());
548     env->setUsed(0);
549
550     CkPackMessage(&env);
551     CmiSetHandler(env, _charmHandlerIdx);
552
553     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
554     //get rid of the new.
555     
556     if(receivedTable)
557         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
558     else {
559         flushTable = 1;
560         (* strategyTable)[curStratID].tmplist.enq(cmsg);
561     }
562 }
563
564 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
565     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
566
567     if(pd != NULL) {
568         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
569         setInstance(ci->_instid);
570     }
571     
572     //Broken, add the processor list here.
573
574     register envelope * env = UsrToEnv(m);
575     env->getsetArrayMgr()=a;
576     env->getsetArraySrcPe()=CkMyPe();
577     env->getsetArrayEp()=ep;
578     env->getsetArrayHops()=0;
579     env->getsetArrayIndex()= dummyArrayIndex;
580     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
581
582     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
583     
584     //RECORD_SENDM_STATS(curStratID, env->getTotalsize(), dest_proc);
585
586     CharmMessageHolder *cmsg = new 
587         CharmMessageHolder((char *)m, IS_BROADCAST);
588     cmsg->npes = 0;
589     cmsg->pelist = NULL;
590     cmsg->sec_id = NULL;
591
592     multicast(cmsg);
593 }
594
595 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, 
596                                      CkArrayID a, CkSectionID &s, int opts) {
597
598 #ifndef CMK_OPTIMIZE
599     traceUserEvent(section_send_event);
600 #endif
601
602     if(pd != NULL) {
603         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
604         setInstance(ci->_instid);
605     }
606     
607     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
608
609     register envelope * env = UsrToEnv(m);
610     env->getsetArrayMgr()=a;
611     env->getsetArraySrcPe()=CkMyPe();
612     env->getsetArrayEp()=ep;
613     env->getsetArrayHops()=0;
614     env->getsetArrayIndex()= dummyArrayIndex;
615     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
616
617     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
618     
619     env->setUsed(0);    
620     CkPackMessage(&env);
621     
622     //Provide a dummy dest proc as it does not matter for mulitcast 
623     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,
624                                                       IS_SECTION_MULTICAST);
625     cmsg->npes = 0;
626     cmsg->sec_id = &s;
627
628     CkSectionInfo minfo;
629     minfo.type = COMLIB_MULTICAST_MESSAGE;
630     minfo.sInfo.cInfo.instId = curStratID;
631     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
632     minfo.sInfo.cInfo.id = 0; 
633     minfo.pe = CkMyPe();
634     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
635     //    s.npes = 0;
636     //s.pelist = NULL;
637
638     multicast(cmsg);
639 }
640
641 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
642     
643     if(pd != NULL) {
644         ComlibInstanceHandle *ci = (ComlibInstanceHandle *)pd;
645         setInstance(ci->_instid);
646     }
647     
648     register envelope * env = UsrToEnv(m);
649     
650     CpvAccess(_qd)->create(1);
651
652     env->setMsgtype(ForBocMsg);
653     env->setEpIdx(ep);
654     env->setGroupNum(g);
655     env->setSrcPe(CkMyPe());
656     env->setUsed(0);
657     ((CmiMsgHeaderExt *)env)->stratid = curStratID;
658
659     CkPackMessage(&env);
660     CmiSetHandler(env, _charmHandlerIdx);
661     
662     //Provide a dummy dest proc as it does not matter for mulitcast 
663     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_BROADCAST);
664     
665     cmsg->npes = 0;
666     cmsg->pelist = NULL;
667
668     multicast(cmsg);
669 }
670
671 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
672
673     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
674     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
675
676     env->setUsed(0);    
677     CkPackMessage(&env);
678
679     //Will be used to detect multicast message for learning
680     
681     if (receivedTable)
682         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
683     else {
684         flushTable = 1;
685         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
686         (* strategyTable)[curStratID].tmplist.enq(cmsg);
687     }
688
689     ComlibPrintf("After multicast\n");
690 }
691
692 //Collect statistics from all the processors, also gets the list of
693 //array elements on each processor.
694 void ComlibManager::collectStats(ComlibLocalStats &stat, int pe, 
695                                  CkVec<ClibGlobalArrayIndex> &idx_vec) {
696     
697     ComlibPrintf("%d: Collecting stats %d\n", CkMyPe(), numStatsReceived);
698
699     numStatsReceived ++;
700     clib_gstats.updateStats(stat, pe);
701     
702     for(int count = 0; count < idx_vec.length(); count++) {
703         int old_pe = CkpvAccess(locationTable)->get(idx_vec[count]);
704         
705         ComlibPrintf("Adding idx %d to %d\n", idx_vec[count].idx.data()[0], 
706                      pe);
707         
708         CkpvAccess(locationTable)->put(idx_vec[count]) = pe + CkNumPes();
709     }        
710
711     //Reset cached array element index. Location table may have changed
712     CkpvAccess(cache_index).nInts = -1;
713     CkpvAccess(cache_aid).setZero();
714
715     if(numStatsReceived == CkNumPes()) {
716         numStatsReceived = 0;
717
718         for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; 
719             count++ ){
720             Strategy* strat = CkpvAccess(conv_com_ptr)->getStrategy(count);
721             if(strat->getType() > CONVERSE_STRATEGY) {
722                 CharmStrategy *cstrat = (CharmStrategy *)strat;
723                 ComlibLearner *learner = cstrat->getLearner();
724                 CharmStrategy *newstrat = NULL;
725                                 
726                 if(learner != NULL) {
727                     ComlibPrintf("Calling Learner\n");
728                     newstrat = (CharmStrategy *)learner->optimizePattern
729                         (strat, clib_gstats);
730                     
731                     if(newstrat != NULL)
732                         ListOfStrategies.enq(newstrat);
733                     else
734                         ListOfStrategies.enq(cstrat);
735                 }
736             }
737         }
738         barrierReached = 1;
739         
740         //if(lbUpdateReceived) {
741         //lbUpdateReceived = CmiFalse;
742         broadcastStrategies();
743         //}
744     }
745 }
746
747 void ComlibManager::setRemote(int remote_pe){
748
749     ComlibPrintf("Setting remote flag on\n");
750
751     remotePe = remote_pe;
752     isRemote = 1;
753 }
754
755
756 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
757                                       int strat_id) {
758     setInstance(strat_id);
759     
760     int nmsgs = rq.length();
761
762     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
763
764     for(int count = 0; count < nmsgs; count++) {
765         char *msg = rq.deq()->getCharmMessage();
766         envelope *env = UsrToEnv(msg);
767         
768         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
769                   env->getsetArrayMgr());
770     }
771
772     endIteration();
773 }
774
775 void ComlibManager::sendRemote(){
776     
777     int nmsgs = remoteQ.length();
778
779     //if(nmsgs == 0)
780     //  return;
781
782     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
783
784     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
785     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
786     
787     for(int count = 0; count < nmsgs; count++) {
788         CharmMessageHolder *cmsg = remoteQ.deq();
789         CkFreeMsg(cmsg->getCharmMessage());
790         delete cmsg;
791     }
792 }
793
794
795 void ComlibManager::AtSync() {
796
797     //comm_debug = 1;
798     ComlibPrintf("[%d] In ComlibManager::Atsync, controller %d, ite %d\n", CkMyPe(), curComlibController, clibIteration);
799
800     barrier2Reached = 0;
801     receivedTable = 0;
802     barrierReached = 0;
803     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
804
805     int pos = 0;
806
807     CkVec<ClibGlobalArrayIndex> gidx_vec;
808
809     CkVec<CkArrayID> tmp_vec;
810     for(int count = 0; count < CkpvAccess(conv_com_ptr)->nstrats; count ++) {
811         if((* strategyTable)[count].strategy->getType() == ARRAY_STRATEGY) {
812             CharmStrategy *cstrat = (CharmStrategy*)
813                 ((* strategyTable)[count].strategy);
814             
815             CkArrayID src, dest;
816             CkArrayIndexMax *elements;
817             int nelem;
818             
819             cstrat->ainfo.getSourceArray(src, elements, nelem);
820             cstrat->ainfo.getDestinationArray(dest, elements, nelem);
821
822             CmiBool srcflag = CmiFalse;
823             CmiBool destflag = CmiFalse;
824             
825             if(src == dest || dest.isZero())
826                 destflag = CmiTrue;
827
828             if(src.isZero())
829                 srcflag = CmiTrue;                        
830
831             for(pos = 0; pos < tmp_vec.size(); pos++) {
832                 if(tmp_vec[pos] == src)
833                     srcflag = CmiTrue;
834
835                 if(tmp_vec[pos] == dest)
836                     destflag = CmiTrue;
837
838                 if(srcflag && destflag)
839                     break;
840             }
841
842             if(!srcflag)
843                 tmp_vec.insertAtEnd(src);
844
845             if(!destflag)
846                 tmp_vec.insertAtEnd(dest);
847         }
848         
849         //cant do it here, done in receiveTable
850         //if((* strategyTable)[count].strategy->getType() > CONVERSE_STRATEGY)
851         //  (* strategyTable)[count].reset();
852     }
853
854     for(pos = 0; pos < tmp_vec.size(); pos++) {
855         CkArrayID aid = tmp_vec[pos];
856
857         ComlibArrayListener *calistener = 
858             CkArrayID::CkLocalBranch(aid)->getComlibArrayListener();
859
860         CkVec<CkArrayIndexMax> idx_vec;
861         calistener->getLocalIndices(idx_vec);
862
863         for(int idx_count = 0; idx_count < idx_vec.size(); idx_count++) {
864             ClibGlobalArrayIndex gindex;
865             gindex.aid = aid;
866             gindex.idx = idx_vec[idx_count];
867
868             gidx_vec.insertAtEnd(gindex);
869         }
870     }
871
872     cgproxy[curComlibController].collectStats(clib_stats, CkMyPe(), gidx_vec);
873     clib_stats.reset();
874 }
875
876 #include "lbdb.h"
877 #include "CentralLB.h"
878
879 /******** FOO BAR : NEEDS to be consistent with array manager *******/
880 void LDObjID2IdxMax (LDObjid ld_id, CkArrayIndexMax &idx) {
881     if(OBJ_ID_SZ < CK_ARRAYINDEX_MAXLEN)
882         CkAbort("LDB OBJ ID smaller than array index\n");
883     
884     //values higher than CkArrayIndexMax should be 0
885     for(int count = 0; count < CK_ARRAYINDEX_MAXLEN; count ++) {
886         idx.data()[count] = ld_id.id[count];
887     }
888     idx.nInts = 1;
889 }
890
891 void ComlibManager::lbUpdate(LBMigrateMsg *msg) {
892     for(int count = 0; count < msg->n_moves; count ++) {
893         MigrateInfo m = msg->moves[count];
894
895         CkArrayID aid; CkArrayIndexMax idx;
896         aid = CkArrayID(m.obj.omhandle.id.id);
897         LDObjID2IdxMax(m.obj.id, idx);
898
899         ClibGlobalArrayIndex cid; 
900         cid.aid = aid;
901         cid.idx = idx;
902         
903         int pe = CkpvAccess(locationTable)->get(cid);
904
905         //Value exists in the table, so update it
906         if(pe != 0) {
907             pe = m.to_pe + CkNumPes();
908             CkpvAccess(locationTable)->getRef(cid) = pe;
909         }
910         //otherwise we dont care about these objects
911     }   
912
913     lbUpdateReceived = CmiTrue;
914     if(barrierReached) {
915         broadcastStrategies();
916         barrierReached = 0;
917     }
918
919     CkFreeMsg(msg);
920 }
921
922 CkDelegateData* ComlibManager::ckCopyDelegateData(CkDelegateData *data) {
923     ComlibInstanceHandle *inst = new ComlibInstanceHandle
924         (*((ComlibInstanceHandle *)data));
925     return inst;
926 }
927
928
929 CkDelegateData * ComlibManager::DelegatePointerPup(PUP::er &p,
930                                                    CkDelegateData *pd) {
931
932     if(pd == NULL)
933         return NULL;
934
935     CmiBool to_pup = CmiFalse;
936
937     ComlibInstanceHandle *inst; 
938     if(!p.isUnpacking()) {
939         inst = (ComlibInstanceHandle *) pd;       
940         if(pd != NULL)
941             to_pup = CmiTrue;
942     }
943     else 
944         //Call migrate constructor
945         inst = new ComlibInstanceHandle();
946
947     p | to_pup;
948
949     if(to_pup)
950         inst->pup(p);
951     return inst;
952 }    
953
954
955 void ComlibDelegateProxy(CProxy *proxy){
956     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
957     proxy->ckDelegate(cgproxy.ckLocalBranch(), NULL);
958 }
959
960 void ComlibAssociateProxy(CharmStrategy *strat, CProxy &proxy) {
961     ComlibInstanceHandle *cinst = new ComlibInstanceHandle
962         (CkGetComlibInstance());
963
964     cinst->setStrategy(strat);
965     
966     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
967     proxy.ckDelegate(cgproxy.ckLocalBranch(), cinst);
968
969
970 void ComlibBegin(CProxy &proxy) {
971     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
972     cinst->beginIteration();
973
974
975 void ComlibEnd(CProxy &proxy) {
976     ComlibInstanceHandle *cinst = (ComlibInstanceHandle *)proxy.ckDelegatedPtr();
977     cinst->endIteration();
978 }
979
980 ComlibInstanceHandle CkCreateComlibInstance(){
981     return CkGetComlibInstance();
982 }
983
984 ComlibInstanceHandle CkGetComlibInstance() {
985     if(CkMyPe() != 0)
986         CkAbort("Comlib Instance can only be created on Processor 0");
987     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
988     return (cgproxy.ckLocalBranch())->createInstance();
989 }
990
991 ComlibInstanceHandle CkGetComlibInstance(int id) {
992     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
993     return cinst;
994 }
995
996 void ComlibDoneCreating(){
997     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
998     (cgproxy.ckLocalBranch())->broadcastStrategies();
999 }
1000
1001 char *router;
1002 int sfactor=0;
1003
1004 class ComlibManagerMain {
1005 public:
1006     ComlibManagerMain(CkArgMsg *msg) {
1007         
1008         if(CkMyPe() == 0 && msg !=  NULL)
1009             CmiGetArgString(msg->argv, "+strategy", &router);         
1010
1011         if(CkMyPe() == 0 && msg !=  NULL)
1012             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
1013         
1014         CProxy_ComlibManager::ckNew();
1015     }
1016 };
1017
1018 //Called by user code
1019 ComlibInstanceHandle::ComlibInstanceHandle() : CkDelegateData() {
1020     _instid = -1;
1021     _dmid.setZero();
1022     _srcPe = -1;
1023     toForward = 0;
1024 }
1025
1026 //Called by user code
1027 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h) 
1028     : CkDelegateData() {
1029     _instid = h._instid;
1030     _dmid = h._dmid;
1031     toForward = h.toForward;        
1032
1033     ComlibPrintf("In Copy Constructor\n");
1034     _srcPe = h._srcPe;
1035
1036     reset();
1037     ref();
1038 }
1039
1040 ComlibInstanceHandle& ComlibInstanceHandle::operator=(const ComlibInstanceHandle &h) {
1041     _instid = h._instid;
1042     _dmid = h._dmid;
1043     toForward = h.toForward;
1044     _srcPe = h._srcPe;
1045     
1046     reset();
1047     ref();
1048     return *this;
1049 }
1050
1051 //Called by the communication library
1052 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
1053     _instid = instid;
1054     _dmid   = dmid;
1055     _srcPe  = -1;
1056     toForward = 0;
1057 }
1058
1059 void ComlibInstanceHandle::beginIteration() { 
1060     CProxy_ComlibManager cgproxy(_dmid);
1061
1062     ComlibPrintf("Instance Handle beginIteration %d, %d, %d\n", CkMyPe(), _srcPe, _instid);
1063
1064     //User forgot to make the instance handle a readonly or pass it
1065     //into the constructor of an array and is using it directly from
1066     //Main :: main
1067     if(_srcPe == -1) {
1068         //ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
1069         _srcPe = CkMyPe();
1070     }
1071
1072     if(_srcPe != CkMyPe() && toForward) {
1073         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
1074     }
1075
1076     (cgproxy.ckLocalBranch())->setInstance(_instid);
1077     (cgproxy.ckLocalBranch())->beginIteration();   
1078 }
1079
1080 void ComlibInstanceHandle::endIteration() {
1081     CProxy_ComlibManager cgproxy(_dmid);
1082     (cgproxy.ckLocalBranch())->endIteration();
1083 }
1084
1085 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
1086     toForward = s->getForwardOnMigration();
1087     CProxy_ComlibManager cgproxy(_dmid);
1088     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
1089 }
1090
1091 CharmStrategy *ComlibInstanceHandle::getStrategy() {
1092     if(_instid < 0) 
1093         return NULL;    
1094     
1095     CProxy_ComlibManager cgproxy(_dmid);
1096     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
1097 }
1098
1099 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
1100
1101 void ComlibInitSectionID(CkSectionID &sid){
1102     
1103     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
1104     sid._cookie.pe = CkMyPe();
1105     
1106     sid._cookie.sInfo.cInfo.id = 0;    
1107     sid.npes = 0;
1108     sid.pelist = NULL;
1109 }
1110
1111 void ComlibResetSectionProxy(CProxySection_ArrayBase *sproxy) {
1112     CkSectionID &sid = sproxy->ckGetSectionID();
1113     ComlibInitSectionID(sid);
1114     sid._cookie.sInfo.cInfo.status = 0;
1115 }
1116
1117 // for backward compatibility - for old name commlib
1118 void _registercommlib(void)
1119 {
1120   static int _done = 0; 
1121   if(_done) 
1122       return; 
1123   _done = 1;
1124   _registercomlib();
1125 }
1126
1127 void ComlibAtSyncHandler(void *msg) {
1128     CmiFree(msg);
1129     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1130     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1131     if(cmgr_ptr)
1132         cmgr_ptr->AtSync();    
1133 }
1134
1135 void ComlibNotifyMigrationDoneHandler(void *msg) {
1136     CmiFree(msg);
1137     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1138     ComlibManager *cmgr_ptr = cgproxy.ckLocalBranch();
1139     if(cmgr_ptr)
1140         cmgr_ptr->AtSync();    
1141 }
1142
1143
1144 void ComlibLBMigrationUpdate(LBMigrateMsg *msg) {
1145     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
1146     (cgproxy.ckLocalBranch())->lbUpdate(msg);
1147 }
1148
1149 #include "comlib.def.h"
1150
1151