New version with migration and forwarding always working. The test program also works.
[charm.git] / src / ck-com / ComlibManager.C
1 #include "ComlibManager.h"
2
3 #include "EachToManyMulticastStrategy.h"
4 #include "DirectMulticastStrategy.h"
5 #include "StreamingStrategy.h"
6 #include "DummyStrategy.h"
7 #include "MPIStrategy.h"
8 #include "NodeMulticast.h"
9 #include "MsgPacker.h"
10 #include "RingMulticastStrategy.h"
11 #include "PipeBroadcastStrategy.h"
12 #include "BroadcastStrategy.h"
13 #include "MeshStreamingStrategy.h"
14 #include "PrioStreaming.h"
15
16 CkpvExtern(int, RecvdummyHandle);
17
18 CkpvDeclare(int, RecvmsgHandle);
19 CkpvDeclare(int, RecvCombinedShortMsgHdlrIdx);
20 CkpvDeclare(CkGroupID, cmgrID);
21 CkpvExtern(ConvComlibManager *, conv_comm_ptr);
22
23 //handler to receive array messages
24 void recv_array_msg(void *msg){
25
26     if(msg == NULL)
27         return;
28     
29     ComlibPrintf("%d:In recv_msg\n", CkMyPe());
30
31     register envelope* env = (envelope *)msg;
32     env->setUsed(0);
33     env->getsetArrayHops()=1;
34     CkUnpackMessage(&env);
35
36     /*
37     CProxyElement_ArrayBase ap(env->getsetArrayMgr(), env->getsetArrayIndex());
38     ComlibPrintf("%d:Array Base created\n", CkMyPe());
39     ap.ckSend((CkArrayMessage *)EnvToUsr(env), env->getsetArrayEp());
40     */
41     
42     CkArray *a=(CkArray *)_localBranch(env->getsetArrayMgr());
43     a->deliver((CkArrayMessage *)EnvToUsr(env), CkDeliver_queue, CmiTrue);    
44
45     ComlibPrintf("%d:Out of recv_msg\n", CkMyPe());
46     return;
47 }
48
49 void recv_combined_array_msg(void *msg){
50     if(msg == NULL)
51         return;
52     
53     ComlibPrintf("%d:In recv_combined_array_msg\n", CkMyPe());
54
55     MsgPacker::deliver((CombinedMessage *)msg);
56 }
57
58 ComlibManager::ComlibManager(){
59     init();
60     ComlibPrintf("In comlibmanager constructor\n");
61 }
62
63 void ComlibManager::init(){
64     
65     PUPable_reg(CharmStrategy);
66     PUPable_reg(CharmMessageHolder);
67     
68     //comm_debug = 1;
69     
70     CkpvAccess(RecvmsgHandle) = CkRegisterHandler((CmiHandler)recv_array_msg);
71     CkpvAccess(RecvCombinedShortMsgHdlrIdx) = 
72         CkRegisterHandler((CmiHandler)recv_combined_array_msg);
73     
74     section_send_event = traceRegisterUserEvent("ArraySectionMulticast");
75     
76     npes = CkNumPes();
77     pelist = NULL;
78     nstrats = 0;
79
80     CkpvInitialize(CkGroupID, cmgrID);
81     CkpvAccess(cmgrID) = thisgroup;
82
83     dummyArrayIndex.nInts = 0;
84
85     curStratID = 0;
86     prevStratID = -1;
87     //prioEndIterationFlag = 1;
88
89     strategyTable = CkpvAccess(conv_comm_ptr)->getStrategyTable();
90     
91     receivedTable = 0;
92     flushTable = 0;
93     totalMsgCount = 0;
94     totalBytes = 0;
95     nIterations = 0;
96     barrierReached = 0;
97     barrier2Reached = 0;
98
99     isRemote = 0;
100     remotePe = -1;
101
102     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
103     cgproxy[0].barrier();
104 }
105
106 //First barrier makes sure that the communication library group 
107 //has been created on all processors
108 void ComlibManager::barrier(){
109   static int bcount = 0;
110   ComlibPrintf("In barrier %d\n", bcount);
111   if(CkMyPe() == 0) {
112     bcount ++;
113     if(bcount == CkNumPes()){
114       barrierReached = 1;
115       doneCreating();
116     }
117   }
118 }
119
120 //Has finished passing the strategy list to all the processors
121 void ComlibManager::barrier2(){
122   static int bcount = 0;
123   if(CkMyPe() == 0) {
124     bcount ++;
125     ComlibPrintf("In barrier2 %d\n", bcount);
126     if(bcount == CkNumPes()) {
127         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
128         cgproxy.resumeFromBarrier2();
129     }
130   }
131 }
132
133 //Registers a set of strategies with the communication library
134 ComlibInstanceHandle ComlibManager::createInstance() {
135   
136     ListOfStrategies.insertAtEnd(NULL);
137     nstrats++;
138     
139     ComlibInstanceHandle cinst(nstrats - 1, CkpvAccess(cmgrID));  
140     return cinst;
141 }
142
143 void ComlibManager::registerStrategy(int pos, CharmStrategy *strat) {
144     ListOfStrategies[pos] = strat;
145 }
146
147 //End of registering function, if barriers have been reached send them over
148 void ComlibManager::doneCreating() {
149     if(!barrierReached)
150       return;    
151
152     ComlibPrintf("Sending Strategies %d, %d\n", nstrats, 
153                  ListOfStrategies.length());
154
155     if(nstrats == 0)
156         return;
157
158     StrategyWrapper sw;
159     sw.s_table = new Strategy* [nstrats];
160     sw.nstrats = nstrats;
161     
162     for (int count=0; count<nstrats; count++)
163         sw.s_table[count] = ListOfStrategies[count];
164
165     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
166     cgproxy.receiveTable(sw);
167 }
168
169 //Called when the array/group element starts sending messages
170 void ComlibManager::beginIteration(){
171     //right now does not do anything might need later
172     ComlibPrintf("[%d]:In Begin Iteration %d\n", CkMyPe(), (* strategyTable)[0].elementCount);
173     //prioEndIterationFlag = 0;
174 }
175
176 void ComlibManager::setInstance(int instID){
177
178     curStratID = instID;
179     ComlibPrintf("[%d]:In setInstance\n", CkMyPe(), (* strategyTable)[instID].elementCount);
180 }
181
182 //called when the array elements has finished sending messages
183 void ComlibManager::endIteration(){
184     //    prioEndIterationFlag = 1;
185     prevStratID = -1;
186     
187     ComlibPrintf("[%d]:In End Iteration(%d) %d, %d\n", CkMyPe(), curStratID, 
188                  (* strategyTable)[curStratID].elementCount, (* strategyTable)[curStratID].numElements);
189
190     if(isRemote) {
191         isRemote = 0;
192         sendRemote();
193         return;
194     }
195
196     if(!receivedTable) {
197         (* strategyTable)[curStratID].nEndItr++;
198         return;
199     }        
200     
201     (* strategyTable)[curStratID].elementCount++;
202     int count = 0;
203     flushTable = 1;
204     
205     if((* strategyTable)[curStratID].elementCount == (* strategyTable)[curStratID].numElements) {
206         
207         ComlibPrintf("[%d]:In End Iteration %d\n", CkMyPe(), (* strategyTable)[curStratID].elementCount);
208         
209         nIterations ++;
210         
211         if(nIterations == LEARNING_PERIOD) {
212             //CkPrintf("Sending %d, %d\n", totalMsgCount, totalBytes);
213             CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
214             cgproxy[0].learnPattern(totalMsgCount, totalBytes);
215         }
216         
217         if(barrier2Reached) {       
218             (* strategyTable)[curStratID].strategy->doneInserting();
219         }
220         else (* strategyTable)[curStratID].call_doneInserting = 1;
221         
222         (* strategyTable)[curStratID].elementCount = 0;
223     }
224     ComlibPrintf("After EndIteration\n");
225 }
226
227 //receive the list of strategies
228 //Insert the strategies into the strategy table in converse comm lib.
229 //CpvAccess(conv_comm_ptr) points to the converse commlib instance
230 void ComlibManager::receiveTable(StrategyWrapper sw){
231     
232     ComlibPrintf("[%d] In receiveTable %d\n", CkMyPe(), sw.nstrats);
233
234     receivedTable = 1;
235     nstrats = sw.nstrats;
236
237     CkArrayID st_aid;
238     int st_nelements;
239     CkArrayIndexMax *st_elem;
240
241     int count = 0;
242     for(count = 0; count < nstrats; count ++) {
243         CharmStrategy *cur_strategy = (CharmStrategy *)sw.s_table[count];
244         
245         //set the instance to the current count
246         //currently all strategies are being copied to all processors
247         //later strategies will be selectively copied
248         cur_strategy->setInstance(count);  
249         CkpvAccess(conv_comm_ptr)->insertStrategy(cur_strategy);
250         
251         ComlibPrintf("[%d] Inserting strategy \n", CkMyPe());       
252
253         if(cur_strategy->getType() == ARRAY_STRATEGY &&
254            cur_strategy->isBracketed()){ 
255
256             ComlibPrintf("Inserting Array Listener\n");
257
258             ComlibArrayInfo as = ((CharmStrategy *)cur_strategy)->ainfo;
259             as.getSourceArray(st_aid, st_elem, st_nelements);
260             
261             if(st_aid.isZero())
262                 CkAbort("Array ID is zero");
263             
264             ComlibArrayListener *calistener = 
265                 CkArrayID::CkLocalBranch(st_aid)->getComlibArrayListener();
266             
267             calistener->registerStrategy(&((* strategyTable)[count]));
268         }              
269   
270         if(cur_strategy->getType() == GROUP_STRATEGY){
271             (* strategyTable)[count].numElements = 1;
272         }
273         
274         cur_strategy->beginProcessing((* strategyTable)[count].numElements); 
275         
276         ComlibPrintf("[%d] endIteration from receiveTable %d, %d\n", 
277                      CkMyPe(), count,
278                      (* strategyTable)[count].nEndItr);
279                          
280         curStratID = count;
281         for(int itr = 0; itr < (* strategyTable)[count].nEndItr; itr++) 
282             endIteration();            
283     }           
284     
285     ComlibPrintf("receivedTable %d\n", nstrats);
286     
287     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
288     cgproxy[0].barrier2();
289 }
290
291 void ComlibManager::resumeFromBarrier2(){
292     barrier2Reached = 1;
293     
294     ComlibPrintf("[%d] Barrier 2 reached\n", CkMyPe());
295
296     //    if(flushTable) {
297     for (int count = 0; count < nstrats; count ++) {
298         if (!(* strategyTable)[count].tmplist.isEmpty()) {
299             CharmMessageHolder *cptr;
300             while (!(* strategyTable)[count].tmplist.isEmpty())
301                 (* strategyTable)[count].strategy->insertMessage
302                     ((* strategyTable)[count].tmplist.deq());
303         }
304         
305         if ((* strategyTable)[count].call_doneInserting) {
306             ComlibPrintf("[%d] Calling done inserting \n", CkMyPe());
307             (* strategyTable)[count].strategy->doneInserting();
308         }
309     }
310     //}
311     
312     ComlibPrintf("[%d] After Barrier2\n", CkMyPe());
313 }
314
315 extern int _charmHandlerIdx;
316
317 void ComlibManager::ArraySend(CkDelegateData *pd,int ep, void *msg, 
318                               const CkArrayIndexMax &idx, CkArrayID a){
319     
320     ComlibPrintf("[%d] In Array Send\n", CkMyPe());
321     /*
322     if(curStratID != prevStratID && prioEndIterationFlag) {        
323         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
324         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
325                      CkMyPe());
326         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg();
327         int mprio = -100;
328         *(int *)CkPriorityPtr(pmsg) = mprio;
329         pmsg->instID = curStratID;
330         CkSetQueueing(pmsg, CK_QUEUEING_BFIFO);
331         cgproxy[CkMyPe()].prioEndIteration(pmsg);
332         prioEndIterationFlag = 0;
333     }        
334     prevStratID = curStratID;            
335     */
336
337     CkArrayIndexMax myidx = idx;
338     int dest_proc = CkArrayID::CkLocalBranch(a)->lastKnown(myidx);
339     
340     //ComlibPrintf("Send Data %d %d %d %d\n", CkMyPe(), dest_proc, 
341     //   UsrToEnv(msg)->getTotalsize(), receivedTable);
342
343     register envelope * env = UsrToEnv(msg);
344     
345     env->getsetArrayMgr()=a;
346     env->getsetArraySrcPe()=CkMyPe();
347     env->getsetArrayEp()=ep;
348     env->getsetArrayHops()=0;
349     env->getsetArrayIndex()=idx;
350     env->setUsed(0);
351     
352     CkPackMessage(&env);
353     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
354     
355     if(isRemote) {
356         CharmMessageHolder *cmsg = new 
357             CharmMessageHolder((char *)msg, dest_proc);
358
359         remoteQ.enq(cmsg);
360         return;
361     }
362
363     if(dest_proc == CkMyPe()){
364         CProxyElement_ArrayBase ap(a,idx);
365         ap.ckSend((CkArrayMessage *)msg, ep);
366         return;
367     }
368
369     totalMsgCount ++;
370     totalBytes += UsrToEnv(msg)->getTotalsize();
371
372     CharmMessageHolder *cmsg = new 
373         CharmMessageHolder((char *)msg, dest_proc);
374     //get rid of the new.
375
376     ComlibPrintf("Before Insert\n");
377
378     if (receivedTable)
379       (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
380     else {
381         flushTable = 1;
382         (* strategyTable)[curStratID].tmplist.enq(cmsg);
383     }
384
385     //CmiPrintf("After Insert\n");
386 }
387
388
389 #include "qd.h"
390 //CpvExtern(QdState*, _qd);
391
392 void ComlibManager::GroupSend(CkDelegateData *pd,int ep, void *msg, int onPE, CkGroupID gid){
393     
394     int dest_proc = onPE;
395     /*
396     if(curStratID != prevStratID && prioEndIterationFlag) {        
397         CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
398         ComlibPrintf("[%d] Array Send calling prio end iteration\n", 
399                      CkMyPe());
400         PrioMsg *pmsg = new(8 * sizeof(int)) PrioMsg;
401         *(int *)CkPriorityPtr(pmsg) = -0x7FFFFFFF;
402         CkSetQueueing(pmsg, CK_QUEUEING_IFIFO);
403         cgproxy[CkMyPe()].prioEndIteration(pmsg);
404         prioEndIterationFlag = 0;
405     }        
406     prevStratID = curStratID;            
407     */
408
409     ComlibPrintf("Send Data %d %d %d\n", CkMyPe(), dest_proc, 
410                  UsrToEnv(msg)->getTotalsize());
411
412     register envelope * env = UsrToEnv(msg);
413     if(dest_proc == CkMyPe()){
414         _SET_USED(env, 0);
415         CkSendMsgBranch(ep, msg, dest_proc, gid);
416         return;
417     }
418     
419     CpvAccess(_qd)->create(1);
420
421     env->setMsgtype(ForBocMsg);
422     env->setEpIdx(ep);
423     env->setGroupNum(gid);
424     env->setSrcPe(CkMyPe());
425     env->setUsed(0);
426
427     CkPackMessage(&env);
428     CmiSetHandler(env, _charmHandlerIdx);
429
430     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)msg, dest_proc); 
431     //get rid of the new.
432     
433     if(receivedTable)
434         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
435     else {
436         flushTable = 1;
437         (* strategyTable)[curStratID].tmplist.enq(cmsg);
438     }
439 }
440
441 void ComlibManager::ArrayBroadcast(CkDelegateData *pd,int ep,void *m,CkArrayID a){
442     ComlibPrintf("[%d] Array Broadcast \n", CkMyPe());
443
444     register envelope * env = UsrToEnv(m);
445     env->getsetArrayMgr()=a;
446     env->getsetArraySrcPe()=CkMyPe();
447     env->getsetArrayEp()=ep;
448     env->getsetArrayHops()=0;
449     env->getsetArrayIndex()= dummyArrayIndex;
450     
451     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
452
453     CkSectionInfo minfo;
454     minfo.type = COMLIB_MULTICAST_MESSAGE;
455     minfo.sInfo.cInfo.instId = curStratID;
456     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
457     minfo.sInfo.cInfo.id = 0; 
458     minfo.pe = CkMyPe();
459     ((CkMcastBaseMsg *)m)->_cookie = minfo;       
460
461     CharmMessageHolder *cmsg = new 
462         CharmMessageHolder((char *)m, IS_MULTICAST);
463     cmsg->npes = 0;
464     cmsg->pelist = NULL;
465     cmsg->sec_id = NULL;
466
467     multicast(cmsg);
468 }
469
470 void ComlibManager::ArraySectionSend(CkDelegateData *pd,int ep, void *m, CkArrayID a, 
471                                      CkSectionID &s) {
472
473 #ifndef CMK_OPTIMIZE
474     traceUserEvent(section_send_event);
475 #endif
476
477     ComlibPrintf("[%d] Array Section Send \n", CkMyPe());
478
479     register envelope * env = UsrToEnv(m);
480     env->getsetArrayMgr()=a;
481     env->getsetArraySrcPe()=CkMyPe();
482     env->getsetArrayEp()=ep;
483     env->getsetArrayHops()=0;
484     env->getsetArrayIndex()= dummyArrayIndex;
485     
486     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
487     
488     env->setUsed(0);    
489     CkPackMessage(&env);
490     
491     totalMsgCount ++;
492     totalBytes += env->getTotalsize();
493
494     //Provide a dummy dest proc as it does not matter for mulitcast 
495     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
496     cmsg->npes = 0;
497     cmsg->sec_id = &s;
498
499     CkSectionInfo minfo;
500     minfo.type = COMLIB_MULTICAST_MESSAGE;
501     minfo.sInfo.cInfo.instId = curStratID;
502     minfo.sInfo.cInfo.status = COMLIB_MULTICAST_ALL;  
503     minfo.sInfo.cInfo.id = 0; 
504     minfo.pe = CkMyPe();
505     ((CkMcastBaseMsg *)m)->_cookie = minfo;    
506     //    s.npes = 0;
507     //s.pelist = NULL;
508
509     multicast(cmsg);
510 }
511
512 void ComlibManager::GroupBroadcast(CkDelegateData *pd,int ep,void *m,CkGroupID g) {
513     register envelope * env = UsrToEnv(m);
514
515     CpvAccess(_qd)->create(1);
516
517     env->setMsgtype(ForBocMsg);
518     env->setEpIdx(ep);
519     env->setGroupNum(g);
520     env->setSrcPe(CkMyPe());
521     env->setUsed(0);
522
523     CkPackMessage(&env);
524     CmiSetHandler(env, _charmHandlerIdx);
525     
526     //Provide a dummy dest proc as it does not matter for mulitcast 
527     CharmMessageHolder *cmsg = new CharmMessageHolder((char *)m,IS_MULTICAST);
528     
529     cmsg->npes = 0;
530     cmsg->pelist = NULL;
531
532     multicast(cmsg);
533 }
534
535 void ComlibManager::multicast(CharmMessageHolder *cmsg) {
536
537     register envelope * env = UsrToEnv(cmsg->getCharmMessage());    
538     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
539
540     env->setUsed(0);    
541     CkPackMessage(&env);
542
543     //Will be used to detect multicast message for learning
544     totalMsgCount ++;
545     totalBytes += env->getTotalsize();
546     
547     if (receivedTable)
548         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
549     else {
550         flushTable = 1;
551         ComlibPrintf("Enqueuing message in tmplist at %d\n", curStratID);
552         (* strategyTable)[curStratID].tmplist.enq(cmsg);
553     }
554
555     ComlibPrintf("After multicast\n");
556 }
557
558 /*
559 void ComlibManager::multicast(void *msg, int npes, int *pelist) {
560     register envelope * env = UsrToEnv(msg);
561     
562     ComlibPrintf("[%d]: In multicast\n", CkMyPe());
563
564     env->setUsed(0);    
565     CkPackMessage(&env);
566     CmiSetHandler(env, CkpvAccess(RecvmsgHandle));
567     
568     totalMsgCount ++;
569     totalBytes += env->getTotalsize();
570
571     CharmMessageHolder *cmsg = new 
572     CharmMessageHolder((char *)msg,IS_MULTICAST);
573     cmsg->npes = npes;
574     cmsg->pelist = pelist;
575     //Provide a dummy dest proc as it does not matter for mulitcast 
576     //get rid of the new.
577     
578     if (receivedTable)
579         (* strategyTable)[curStratID].strategy->insertMessage(cmsg);
580     else {
581         flushTable = 1;
582         ComlibPrintf("Enqueuing message in tmplist\n");
583         (* strategyTable)[curStratID].tmplist.enq(cmsg);
584     }
585
586     ComlibPrintf("After multicast\n");
587 }
588 */
589
590
591 void ComlibManager::learnPattern(int total_msg_count, int total_bytes) {
592     static int nrecvd = 0;
593     static double avg_message_count = 0;
594     static double avg_message_bytes = 0;
595
596     avg_message_count += ((double) total_msg_count) / LEARNING_PERIOD;
597     avg_message_bytes += ((double) total_bytes) /  LEARNING_PERIOD;
598
599     nrecvd ++;
600     
601     if(nrecvd == CkNumPes()) {
602         //Number of messages and bytes a processor sends in each iteration
603         avg_message_count /= CkNumPes();
604         avg_message_bytes /= CkNumPes();
605         
606         //CkPrintf("STATS = %5.3lf, %5.3lf", avg_message_count,
607         //avg_message_bytes);
608
609         //Learning, ignoring contention for now! 
610         double cost_dir, cost_mesh, cost_grid, cost_hyp;
611         double p=(double)CkNumPes();
612         cost_dir = ALPHA * avg_message_count + BETA * avg_message_bytes;
613         cost_mesh = ALPHA * 2 * sqrt(p) + BETA * avg_message_bytes * 2;
614         cost_grid = ALPHA * 3 * pow(p,1.0/3.0) + BETA * avg_message_bytes * 3;
615         cost_hyp =  (log(p)/log(2.0))*(ALPHA  + BETA * avg_message_bytes/2.0);
616         
617         // Find the one with the minimum cost!
618         int min_strat = USE_MESH; 
619         double min_cost = cost_mesh;
620         if(min_cost > cost_hyp)
621             min_strat = USE_HYPERCUBE;
622         if(min_cost > cost_grid)
623             min_strat = USE_GRID;
624
625         if(min_cost > cost_dir)
626             min_strat = USE_DIRECT;
627
628         switchStrategy(min_strat);        
629     }
630 }
631
632 void ComlibManager::switchStrategy(int strat){
633     //CkPrintf("Switching to %d\n", strat);
634 }
635
636 void ComlibManager::setRemote(int remote_pe){
637
638     ComlibPrintf("Setting remote flag on\n");
639
640     remotePe = remote_pe;
641     isRemote = 1;
642 }
643
644
645 void ComlibManager::receiveRemoteSend(CkQ<CharmMessageHolder *> &rq, 
646                                       int strat_id) {
647     setInstance(strat_id);
648     
649     int nmsgs = rq.length();
650
651     ComlibPrintf("%d: Receiving remote message\n", CkMyPe());
652
653     for(int count = 0; count < nmsgs; count++) {
654         char *msg = rq.deq()->getCharmMessage();
655         envelope *env = UsrToEnv(msg);
656         
657         ArraySend(NULL, env->getsetArrayEp(), msg, env->getsetArrayIndex(), 
658                   env->getsetArrayMgr());
659     }
660
661     endIteration();
662 }
663
664 void ComlibManager::sendRemote(){
665     
666     int nmsgs = remoteQ.length();
667
668     //if(nmsgs == 0)
669     //  return;
670
671     ComlibPrintf("%d: Sending remote message \n", CkMyPe());
672
673     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID)); 
674     cgproxy[remotePe].receiveRemoteSend(remoteQ, curStratID);
675     
676     for(int count = 0; count < nmsgs; count++) {
677         CharmMessageHolder *cmsg = remoteQ.deq();
678         CkFreeMsg(cmsg->getCharmMessage());
679         delete cmsg;
680     }
681 }
682
683
684 /*
685 void ComlibManager::prioEndIteration(PrioMsg *pmsg){
686     CkPrintf("[%d] In Prio End Iteration\n", CkMyPe());
687     setInstance(pmsg->instID);
688     endIteration();
689     delete pmsg;
690 }
691 */
692
693 void ComlibDelegateProxy(CProxy *proxy){
694     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
695     proxy->ckDelegate(cgproxy.ckLocalBranch());
696 }
697
698 ComlibInstanceHandle CkCreateComlibInstance(){
699     return CkGetComlibInstance();
700 }
701
702 ComlibInstanceHandle CkGetComlibInstance() {
703     if(CkMyPe() != 0)
704         CkAbort("Comlib Instance can only be created on Processor 0");
705     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
706     return (cgproxy.ckLocalBranch())->createInstance();
707 }
708
709 ComlibInstanceHandle CkGetComlibInstance(int id) {
710     ComlibInstanceHandle cinst(id, CkpvAccess(cmgrID));
711     return cinst;
712 }
713
714 void ComlibDoneCreating(){
715     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));
716     (cgproxy.ckLocalBranch())->doneCreating();
717 }
718
719 char *router;
720 int sfactor=0;
721
722 class ComlibManagerMain {
723 public:
724     ComlibManagerMain(CkArgMsg *msg) {
725         
726         if(CkMyPe() == 0 && msg !=  NULL)
727             CmiGetArgString(msg->argv, "+strategy", &router);         
728
729         if(CkMyPe() == 0 && msg !=  NULL)
730             CmiGetArgInt(msg->argv, "+spanning_factor", &sfactor);
731         
732         CProxy_ComlibManager::ckNew();
733     }
734 };
735
736 //Called by user code
737 ComlibInstanceHandle::ComlibInstanceHandle(){
738     _instid = -1;
739     _dmid.setZero();
740     _srcPe = -1;
741     toForward = 0;
742 }
743
744 //Called by user code
745 ComlibInstanceHandle::ComlibInstanceHandle(const ComlibInstanceHandle &h){
746     _instid = h._instid;
747     _dmid = h._dmid;
748     toForward = h.toForward;
749
750     ComlibPrintf("In Copy Constructor\n");
751
752     //We DO NOT copy the source processor
753     //Source PE is initialized here
754     _srcPe = CkMyPe();
755 }
756
757 void ComlibInstanceHandle::init(){
758     CProxy_ComlibManager cgproxy(CkpvAccess(cmgrID));    
759     *this = (cgproxy.ckLocalBranch())->createInstance();
760 }
761
762 //Called by the communication library
763 ComlibInstanceHandle::ComlibInstanceHandle(int instid, CkGroupID dmid){
764     _instid = instid;
765     _dmid   = dmid;
766     _srcPe  = -1;
767     toForward = 0;
768 }
769
770 void ComlibInstanceHandle::beginIteration() { 
771     CProxy_ComlibManager cgproxy(_dmid);
772
773     ComlibPrintf("Instance Handle beginIteration %d, %d\n", CkMyPe(), _srcPe);
774
775     //User forgot to make the instance handle a readonly or pass it
776     //into the constructor of an array and is using it directly from
777     //Main :: main
778     if(_srcPe == -1) {
779         ComlibPrintf("Warning:Instance Handle needs to be a readonly or a private variable of an array element\n");
780         _srcPe = CkMyPe();
781     }
782
783     if(_srcPe != CkMyPe() && toForward) {
784         (cgproxy.ckLocalBranch())->setRemote(_srcPe);
785     }
786
787     (cgproxy.ckLocalBranch())->setInstance(_instid);
788     (cgproxy.ckLocalBranch())->beginIteration();   
789 }
790
791 void ComlibInstanceHandle::endIteration() {
792     CProxy_ComlibManager cgproxy(_dmid);
793     (cgproxy.ckLocalBranch())->endIteration();
794 }
795
796 void ComlibInstanceHandle::setStrategy(CharmStrategy *s) {
797     toForward = s->getForwardOnMigration();
798     CProxy_ComlibManager cgproxy(_dmid);
799     (cgproxy.ckLocalBranch())->registerStrategy(_instid, s);
800 }
801
802 CkGroupID ComlibInstanceHandle::getComlibManagerID() {return _dmid;}    
803
804 void ComlibInitSectionID(CkSectionID &sid){
805
806     sid._cookie.type = COMLIB_MULTICAST_MESSAGE;
807     sid._cookie.pe = CkMyPe();
808
809     sid._cookie.sInfo.cInfo.id = 0;    
810     sid.npes = 0;
811     sid.pelist = NULL;
812 }
813
814 // for backward compatibility - for old name commlib
815 void _registercommlib(void)
816 {
817   static int _done = 0; if(_done) return; _done = 1;
818   _registercomlib();
819 }
820
821 #include "comlib.def.h"
822
823