adding a hook to get a strategy pointer in the user program from the instancehandle
[charm.git] / src / ck-com / ComlibManager.C
1 #include "ComlibManager.h"
2
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_comm_ptr);
22
23 //handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     if(msg == NULL)
27         return;
28     
29     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
30
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     /*
37     CProxyElement_ArrayBase ap(env->getsetArrayMgr(), env->getsetArrayIndex());
38     ComlibPrintf("%d:Array Base created\n", CkMyPe());
39     ap.ckSend((CkArrayMessage *)EnvToUsr(env), env->getsetArrayEp());
40     */
41     
42     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
43     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue);
44
45     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
46     return;
47 }
48
49 void recv_combined_array_msg(void *msg){
50     if(msg == NULL)
51         return;
52     
53     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
54
55     MsgPacker::deliver((CombinedMessage *)msg);
56 }
57
58 ComlibManager::ComlibManager(){
59     init();
60     ComlibPrintf("In comlibmanager constructor\n");
61 }
62
63 void ComlibManager::init(){
64     
65     PUPable_reg(CharmStrategy);
66     PUPable_reg(CharmMessageHolder);
67     
68     //comm_debug = 1;
69     
70     CkpvInitialize(int, RecvmsgHandle);
71     CkpvAccess(RecvmsgHandle) = CkRegisterHandler((CmiHandler)recv_array_msg);
72
73     bcast_pelist = new int [CkNumPes()];
74     for(int bcount = 0; bcount < CkNumPes(); bcount++)
75         bcast_pelist[bcount] = bcount;
76
77     CkpvInitialize(int, RecvCombinedShortMsgHdlrIdx);
78     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
79         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
80     
81     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
82     
83     npes = CkNumPes();
84     pelist = NULL;
85     nstrats = 0;
86
87     CkpvInitialize(CkGroupID, cmgrID);
88     CkpvAccess(cmgrID) = thisgroup;
89
90     dummyArrayIndex.nInts = 0;
91
92     curStratID = 0;
93     prevStratID = -1;
94     //prioEndIterationFlag = 1;
95
96     strategyTable = CkpvAccess(conv_comm_ptr)->getStrategyTable();
97     
98     receivedTable = 0;
99     flushTable = 0;
100     totalMsgCount = 0;
101     totalBytes = 0;
102     nIterations = 0;
103     barrierReached = 0;
104     barrier2Reached = 0;
105
106     isRemote = 0;
107     remotePe = -1;
108
109     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
110     cgproxy[0].barrier();
111 }
112
113 //First barrier makes sure that the communication library group 
114 //has been created on all processors
115 void ComlibManager::barrier(){
116   static int bcount = 0;
117   ComlibPrintf("In barrier %d\n", bcount);
118   if(CkMyPe() == 0) {
119     bcount ++;
120     if(bcount == CkNumPes()){
121       barrierReached = 1;
122       doneCreating();
123     }
124   }
125 }
126
127 //Has finished passing the strategy list to all the processors
128 void ComlibManager::barrier2(){
129   static int bcount = 0;
130   if(CkMyPe() == 0) {
131     bcount ++;
132     ComlibPrintf("In barrier2 %d\n", bcount);
133     if(bcount == CkNumPes()) {
134         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
135         cgproxy.resumeFromBarrier2();
136     }
137   }
138 }
139
140 //Registers a set of strategies with the communication library
141 ComlibInstanceHandle ComlibManager::createInstance() {
142   
143     ListOfStrategies.insertAtEnd(NULL);
144     nstrats++;
145     
146     ComlibInstanceHandle cinst(nstrats - 1, CkpvAccess(cmgrID));  
147     return cinst;
148 }
149
150 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
151     ListOfStrategies[pos] = strat;
152 }
153
154 //End of registering function, if barriers have been reached send them over
155 void ComlibManager::doneCreating() {
156     if(!barrierReached)
157       return;    
158
159     ComlibPrintf("Sending Strategies %d, %d\n", nstrats, 
160                  ListOfStrategies.length());
161
162     if(nstrats == 0)
163         return;
164
165     StrategyWrapper sw;
166     sw.s_table = new Strategy* [nstrats];
167     sw.nstrats = nstrats;
168     
169     for (int count=0; count<nstrats; count++)
170         sw.s_table[count] = ListOfStrategies[count];
171
172     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
173     cgproxy.receiveTable(sw);
174 }
175
176 //Called when the array/group element starts sending messages
177 void ComlibManager::beginIteration(){
178     //right now does not do anything might need later
179     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
180     //prioEndIterationFlag = 0;
181 }
182
183 void ComlibManager::setInstance(int instID){
184
185     curStratID = instID;
186     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
187 }
188
189 //called when the array elements has finished sending messages
190 void ComlibManager::endIteration(){
191     //    prioEndIterationFlag = 1;
192     prevStratID = -1;
193     
194     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), curStratID, 
195                  (* strategyTable)[curStratID].elementCount, (* strategyTable)[curStratID].numElements);
196
197     if(isRemote) {
198         isRemote = 0;
199         sendRemote();
200         return;
201     }
202
203     if(!receivedTable) {
204         (* strategyTable)[curStratID].nEndItr++;
205         return;
206     }        
207     
208     (* strategyTable)[curStratID].elementCount++;
209     int count = 0;
210     flushTable = 1;
211     
212     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
213         
214         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
215         
216         nIterations ++;
217         
218         if(nIterations == LEARNING_PERIOD) {
219             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
220             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
221             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
222         }
223         
224         if(barrier2Reached) {       
225             (* strategyTable)[curStratID].strategy->doneInserting();
226         }
227         else (* strategyTable)[curStratID].call_doneInserting = 1;
228         
229         (* strategyTable)[curStratID].elementCount = 0;
230     }
231     ComlibPrintf("After EndIteration\n");
232 }
233
234 //receive the list of strategies
235 //Insert the strategies into the strategy table in converse comm lib.
236 //CpvAccess(conv_comm_ptr) points to the converse commlib instance
237 void ComlibManager::receiveTable(StrategyWrapper sw){
238     
239     ComlibPrintf("[%d] In receiveTable %d\n", CkMyPe(), sw.nstrats);
240
241     receivedTable = 1;
242     nstrats = sw.nstrats;
243
244     CkArrayID st_aid;
245     int st_nelements;
246     CkArrayIndexMax *st_elem;
247
248     int count = 0;
249     for(count = 0; count < nstrats; count ++) {
250         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
251         
252         //set the instance to the current count
253         //currently all strategies are being copied to all processors
254         //later strategies will be selectively copied
255         cur_strategy->setInstance(count);  
256         CkpvAccess(conv_comm_ptr)->insertStrategy(cur_strategy);
257         
258         ComlibPrintf("[%d] Inserting strategy \n", CkMyPe());       
259
260         if(cur_strategy->getType() == ARRAY_STRATEGY &&
261            cur_strategy->isBracketed()){ 
262
263             ComlibPrintf("Inserting Array Listener\n");
264
265             ComlibArrayInfo as = ((CharmStrategy *)cur_strategy)->ainfo;
266             as.getSourceArray(st_aid, st_elem, st_nelements);
267             
268             if(st_aid.isZero())
269                 CkAbort("Array ID is zero");
270             
271             ComlibArrayListener *calistener = 
272                 CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
273             
274             calistener->registerStrategy(&((* strategyTable)[count]));
275         }              
276   
277         if(cur_strategy->getType() == GROUP_STRATEGY){
278             (* strategyTable)[count].numElements = 1;
279         }
280         
281         cur_strategy->beginProcessing((* strategyTable)[count].numElements); 
282         
283         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
284                      CkMyPe(), count,
285                      (* strategyTable)[count].nEndItr);
286                          
287         curStratID = count;
288         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
289             endIteration();            
290     }           
291     
292     ComlibPrintf("receivedTable %d\n", nstrats);
293     
294     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
295     cgproxy[0].barrier2();
296 }
297
298 void ComlibManager::resumeFromBarrier2(){
299     barrier2Reached = 1;
300     
301     ComlibPrintf("[%d] Barrier 2 reached\n", CkMyPe());
302
303     //    if(flushTable) {
304     for (int count = 0; count < nstrats; count ++) {
305         if (!(* strategyTable)[count].tmplist.isEmpty()) {
306             CharmMessageHolder *cptr;
307             while (!(* strategyTable)[count].tmplist.isEmpty())
308                 (* strategyTable)[count].strategy->insertMessage
309                     ((* strategyTable)[count].tmplist.deq());
310         }
311         
312         if ((* strategyTable)[count].call_doneInserting) {
313             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
314             (* strategyTable)[count].strategy->doneInserting();
315         }
316     }
317     //}
318     
319     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
320 }
321
322 extern int _charmHandlerIdx;
323
324 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
325                               const CkArrayIndexMax &idx, CkArrayID a){
326     
327     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
328     /*
329     if(curStratID != prevStratID && prioEndIterationFlag) {        
330         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
331         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
332                      CkMyPe());
333         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg();
334         int mprio = -100;
335         *(int *)CkPriorityPtr(pmsg) = mprio;
336         pmsg->instID = curStratID;
337         CkSetQueueing(pmsg, CK_QUEUEING_BFIFO);
338         cgproxy[CkMyPe()].prioEndIteration(pmsg);
339         prioEndIterationFlag = 0;
340     }        
341     prevStratID = curStratID;            
342     */
343
344     CkArrayIndexMax myidx = idx;
345     int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
346     
347     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
348     //   UsrToEnv(msg)->getTotalsize(), receivedTable);
349
350     register envelope * env = UsrToEnv(msg);
351     
352     env->getsetArrayMgr()=a;
353     env->getsetArraySrcPe()=CkMyPe();
354     env->getsetArrayEp()=ep;
355     env->getsetArrayHops()=0;
356     env->getsetArrayIndex()=idx;
357     env->setUsed(0);
358     
359     CkPackMessage(&env);
360     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
361     
362     if(isRemote) {
363         CharmMessageHolder *cmsg = new 
364             CharmMessageHolder((char *)msg, dest_proc);
365
366         remoteQ.enq(cmsg);
367         return;
368     }
369
370     if(dest_proc == CkMyPe()){
371         CProxyElement_ArrayBase ap(a,idx);
372         ap.ckSend((CkArrayMessage *)msg, ep);
373         return;
374     }
375
376     totalMsgCount ++;
377     totalBytes += UsrToEnv(msg)->getTotalsize();
378
379     CharmMessageHolder *cmsg = new 
380         CharmMessageHolder((char *)msg, dest_proc);
381     //get rid of the new.
382
383     ComlibPrintf("Before Insert\n");
384
385     if (receivedTable)
386       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
387     else {
388         flushTable = 1;
389         (* strategyTable)[curStratID].tmplist.enq(cmsg);
390     }
391
392     //CmiPrintf("After Insert\n");
393 }
394
395
396 #include "qd.h"
397 //CpvExtern(QdState*, _qd);
398
399 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
400     
401     int dest_proc = onPE;
402     /*
403     if(curStratID != prevStratID && prioEndIterationFlag) {        
404         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
405         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
406                      CkMyPe());
407         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg;
408         *(int *)CkPriorityPtr(pmsg) = -0x7FFFFFFF;
409         CkSetQueueing(pmsg, CK_QUEUEING_IFIFO);
410         cgproxy[CkMyPe()].prioEndIteration(pmsg);
411         prioEndIterationFlag = 0;
412     }        
413     prevStratID = curStratID;            
414     */
415
416     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
417                  UsrToEnv(msg)->getTotalsize());
418
419     register envelope * env = UsrToEnv(msg);
420     if(dest_proc == CkMyPe()){
421         _SET_USED(env, 0);
422         CkSendMsgBranch(ep, msg, dest_proc, gid);
423         return;
424     }
425     
426     CpvAccess(_qd)->create(1);
427
428     env->setMsgtype(ForBocMsg);
429     env->setEpIdx(ep);
430     env->setGroupNum(gid);
431     env->setSrcPe(CkMyPe());
432     env->setUsed(0);
433
434     CkPackMessage(&env);
435     CmiSetHandler(env, _charmHandlerIdx);
436
437     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
438     //get rid of the new.
439     
440     if(receivedTable)
441         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
442     else {
443         flushTable = 1;
444         (* strategyTable)[curStratID].tmplist.enq(cmsg);
445     }
446 }
447
448 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
449     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
450
451     register envelope * env = UsrToEnv(m);
452     env->getsetArrayMgr()=a;
453     env->getsetArraySrcPe()=CkMyPe();
454     env->getsetArrayEp()=ep;
455     env->getsetArrayHops()=0;
456     env->getsetArrayIndex()= dummyArrayIndex;
457     
458     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
459
460     CkSectionInfo minfo;
461     minfo.type = COMLIB_MULTICAST_MESSAGE;
462     minfo.sInfo.cInfo.instId = curStratID;
463     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
464     minfo.sInfo.cInfo.id = 0; 
465     minfo.pe = CkMyPe();
466     ((CkMcastBaseMsg *)m)->_cookie = minfo;       
467
468     CharmMessageHolder *cmsg = new 
469         CharmMessageHolder((char *)m, IS_MULTICAST);
470     cmsg->npes = CkNumPes();
471     cmsg->pelist = bcast_pelist;
472     cmsg->sec_id = NULL;
473
474     multicast(cmsg);
475 }
476
477 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, CkArrayID a, 
478                                      CkSectionID &s) {
479
480 #ifndef CMK_OPTIMIZE
481     traceUserEvent(section_send_event);
482 #endif
483
484     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
485
486     register envelope * env = UsrToEnv(m);
487     env->getsetArrayMgr()=a;
488     env->getsetArraySrcPe()=CkMyPe();
489     env->getsetArrayEp()=ep;
490     env->getsetArrayHops()=0;
491     env->getsetArrayIndex()= dummyArrayIndex;
492     
493     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
494     
495     env->setUsed(0);    
496     CkPackMessage(&env);
497     
498     totalMsgCount ++;
499     totalBytes += env->getTotalsize();
500
501     //Provide a dummy dest proc as it does not matter for mulitcast 
502     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
503     cmsg->npes = 0;
504     cmsg->sec_id = &s;
505
506     CkSectionInfo minfo;
507     minfo.type = COMLIB_MULTICAST_MESSAGE;
508     minfo.sInfo.cInfo.instId = curStratID;
509     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
510     minfo.sInfo.cInfo.id = 0; 
511     minfo.pe = CkMyPe();
512     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
513     //    s.npes = 0;
514     //s.pelist = NULL;
515
516     multicast(cmsg);
517 }
518
519 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
520     register envelope * env = UsrToEnv(m);
521
522     CpvAccess(_qd)->create(1);
523
524     env->setMsgtype(ForBocMsg);
525     env->setEpIdx(ep);
526     env->setGroupNum(g);
527     env->setSrcPe(CkMyPe());
528     env->setUsed(0);
529
530     CkPackMessage(&env);
531     CmiSetHandler(env, _charmHandlerIdx);
532     
533     //Provide a dummy dest proc as it does not matter for mulitcast 
534     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
535     
536     cmsg->npes = 0;
537     cmsg->pelist = NULL;
538
539     multicast(cmsg);
540 }
541
542 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
543
544     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
545     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
546
547     env->setUsed(0);    
548     CkPackMessage(&env);
549
550     //Will be used to detect multicast message for learning
551     totalMsgCount ++;
552     totalBytes += env->getTotalsize();
553     
554     if (receivedTable)
555         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
556     else {
557         flushTable = 1;
558         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
559         (* strategyTable)[curStratID].tmplist.enq(cmsg);
560     }
561
562     ComlibPrintf("After multicast\n");
563 }
564
565 /*
566 void ComlibManager::multicast(void *msg, int npes, int *pelist) {
567     register envelope * env = UsrToEnv(msg);
568     
569     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
570
571     env->setUsed(0);    
572     CkPackMessage(&env);
573     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
574     
575     totalMsgCount ++;
576     totalBytes += env->getTotalsize();
577
578     CharmMessageHolder *cmsg = new 
579     CharmMessageHolder((char *)msg,IS_MULTICAST);
580     cmsg->npes = npes;
581     cmsg->pelist = pelist;
582     //Provide a dummy dest proc as it does not matter for mulitcast 
583     //get rid of the new.
584     
585     if (receivedTable)
586         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
587     else {
588         flushTable = 1;
589         ComlibPrintf("Enqueuing message in tmplist\n");
590         (* strategyTable)[curStratID].tmplist.enq(cmsg);
591     }
592
593     ComlibPrintf("After multicast\n");
594 }
595 */
596
597
598 void ComlibManager::learnPattern(int total_msg_count, int total_bytes) {
599     static int nrecvd = 0;
600     static double avg_message_count = 0;
601     static double avg_message_bytes = 0;
602
603     avg_message_count += ((double) total_msg_count) / LEARNING_PERIOD;
604     avg_message_bytes += ((double) total_bytes) /  LEARNING_PERIOD;
605
606     nrecvd ++;
607     
608     if(nrecvd == CkNumPes()) {
609         //Number of messages and bytes a processor sends in each iteration
610         avg_message_count /= CkNumPes();
611         avg_message_bytes /= CkNumPes();
612         
613         //CkPrintf("STATS = %5.3lf, %5.3lf", avg_message_count,
614         //avg_message_bytes);
615
616         //Learning, ignoring contention for now! 
617         double cost_dir, cost_mesh, cost_grid, cost_hyp;
618         double p=(double)CkNumPes();
619         cost_dir = ALPHA * avg_message_count + BETA * avg_message_bytes;
620         cost_mesh = ALPHA * 2 * sqrt(p) + BETA * avg_message_bytes * 2;
621         cost_grid = ALPHA * 3 * pow(p,1.0/3.0) + BETA * avg_message_bytes * 3;
622         cost_hyp =  (log(p)/log(2.0))*(ALPHA  + BETA * avg_message_bytes/2.0);
623         
624         // Find the one with the minimum cost!
625         int min_strat = USE_MESH; 
626         double min_cost = cost_mesh;
627         if(min_cost > cost_hyp)
628             min_strat = USE_HYPERCUBE;
629         if(min_cost > cost_grid)
630             min_strat = USE_GRID;
631
632         if(min_cost > cost_dir)
633             min_strat = USE_DIRECT;
634
635         switchStrategy(min_strat);        
636     }
637 }
638
639 void ComlibManager::switchStrategy(int strat){
640     //CkPrintf("Switching to %d\n", strat);
641 }
642
643 void ComlibManager::setRemote(int remote_pe){
644
645     ComlibPrintf("Setting remote flag on\n");
646
647     remotePe = remote_pe;
648     isRemote = 1;
649 }
650
651
652 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
653                                       int strat_id) {
654     setInstance(strat_id);
655     
656     int nmsgs = rq.length();
657
658     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
659
660     for(int count = 0; count < nmsgs; count++) {
661         char *msg = rq.deq()->getCharmMessage();
662         envelope *env = UsrToEnv(msg);
663         
664         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
665                   env->getsetArrayMgr());
666     }
667
668     endIteration();
669 }
670
671 void ComlibManager::sendRemote(){
672     
673     int nmsgs = remoteQ.length();
674
675     //if(nmsgs == 0)
676     //  return;
677
678     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
679
680     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
681     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
682     
683     for(int count = 0; count < nmsgs; count++) {
684         CharmMessageHolder *cmsg = remoteQ.deq();
685         CkFreeMsg(cmsg->getCharmMessage());
686         delete cmsg;
687     }
688 }
689
690
691 /*
692 void ComlibManager::prioEndIteration(PrioMsg *pmsg){
693     CkPrintf("[%d] In Prio End Iteration\n", CkMyPe());
694     setInstance(pmsg->instID);
695     endIteration();
696     delete pmsg;
697 }
698 */
699
700 void ComlibDelegateProxy(CProxy *proxy){
701     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
702     proxy->ckDelegate(cgproxy.ckLocalBranch());
703 }
704
705 ComlibInstanceHandle CkCreateComlibInstance(){
706     return CkGetComlibInstance();
707 }
708
709 ComlibInstanceHandle CkGetComlibInstance() {
710     if(CkMyPe() != 0)
711         CkAbort("Comlib Instance can only be created on Processor 0");
712     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
713     return (cgproxy.ckLocalBranch())->createInstance();
714 }
715
716 ComlibInstanceHandle CkGetComlibInstance(int id) {
717     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
718     return cinst;
719 }
720
721 void ComlibDoneCreating(){
722     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
723     (cgproxy.ckLocalBranch())->doneCreating();
724 }
725
726 char *router;
727 int sfactor=0;
728
729 class ComlibManagerMain {
730 public:
731     ComlibManagerMain(CkArgMsg *msg) {
732         
733         if(CkMyPe() == 0 && msg !=  NULL)
734             CmiGetArgString(msg->argv, "+strategy", &router);         
735
736         if(CkMyPe() == 0 && msg !=  NULL)
737             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
738         
739         CProxy_ComlibManager::ckNew();
740     }
741 };
742
743 //Called by user code
744 ComlibInstanceHandle::ComlibInstanceHandle(){
745     _instid = -1;
746     _dmid.setZero();
747     _srcPe = -1;
748     toForward = 0;
749 }
750
751 //Called by user code
752 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h){
753     _instid = h._instid;
754     _dmid = h._dmid;
755     toForward = h.toForward;
756
757     ComlibPrintf("In Copy Constructor\n");
758
759     //We DO NOT copy the source processor
760     //Source PE is initialized here
761     _srcPe = CkMyPe();
762 }
763
764 void ComlibInstanceHandle::init(){
765     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
766     *this = (cgproxy.ckLocalBranch())->createInstance();
767 }
768
769 //Called by the communication library
770 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
771     _instid = instid;
772     _dmid   = dmid;
773     _srcPe  = -1;
774     toForward = 0;
775 }
776
777 void ComlibInstanceHandle::beginIteration() { 
778     CProxy_ComlibManager cgproxy(_dmid);
779
780     ComlibPrintf("Instance Handle beginIteration %d, %d\n", CkMyPe(), _srcPe);
781
782     //User forgot to make the instance handle a readonly or pass it
783     //into the constructor of an array and is using it directly from
784     //Main :: main
785     if(_srcPe == -1) {
786         ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
787         _srcPe = CkMyPe();
788     }
789
790     if(_srcPe != CkMyPe() && toForward) {
791         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
792     }
793
794     (cgproxy.ckLocalBranch())->setInstance(_instid);
795     (cgproxy.ckLocalBranch())->beginIteration();   
796 }
797
798 void ComlibInstanceHandle::endIteration() {
799     CProxy_ComlibManager cgproxy(_dmid);
800     (cgproxy.ckLocalBranch())->endIteration();
801 }
802
803 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
804     toForward = s->getForwardOnMigration();
805     CProxy_ComlibManager cgproxy(_dmid);
806     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
807 }
808
809 CharmStrategy *ComlibInstanceHandle::getStrategy() {
810     if(_instid < 0) 
811         return NULL;    
812     
813     CProxy_ComlibManager cgproxy(_dmid);
814     return (cgproxy.ckLocalBranch())->getStrategy(_instid);
815 }
816
817 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
818
819 void ComlibInitSectionID(CkSectionID &sid){
820     
821     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
822     sid._cookie.pe = CkMyPe();
823     
824     sid._cookie.sInfo.cInfo.id = 0;    
825     sid.npes = 0;
826     sid.pelist = NULL;
827 }
828
829 // for backward compatibility - for old name commlib
830 void _registercommlib(void)
831 {
832   static int _done = 0; if(_done) return; _done = 1;
833   _registercomlib();
834 }
835
836 #include "comlib.def.h"
837
838