Rename the majority of remaining C files in the RTS to C++
[charm.git] / src / conv-ldb / cldb.neighbor.C
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define USE_MULTICAST           0
10 #define IDLE_IMMEDIATE          1
11 #define TRACE_USEREVENTS        1
12
13 #define PERIOD 20                /* default: 30 */
14 #define MAXOVERLOAD 1
15
16 static int  LBPeriod = PERIOD;                 /* time to call load balancing */
17 static int  overload_threshold = MAXOVERLOAD;
18
19 typedef struct CldProcInfo_s {
20   double lastCheck;
21   int    sent;                  /* flag to disable idle work request */
22   int    balanceEvt;            /* user event for balancing */
23   int    updateLoadEvt; 
24   int    idleEvt;               /* user event for idle balancing */
25   int    idleprocEvt;           /* user event for processing idle req */
26 } *CldProcInfo;
27
28 CMI_EXTERNC_VARIABLE char *_lbtopo;                     /* topology name string */
29 int _lbsteal = 0;                       /* work stealing flag */
30
31 CMI_EXTERNC
32 void gengraph(int, int, int, int *, int *);
33
34 CpvStaticDeclare(CldProcInfo, CldData);
35 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
36 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
37 CpvStaticDeclare(int, MinLoad);
38 CpvStaticDeclare(int, MinProc);
39 CpvStaticDeclare(int, Mindex);
40 CpvStaticDeclare(int, start);
41
42 #if ! USE_MULTICAST
43 CpvStaticDeclare(loadmsg *, msgpool);
44
45 static
46 #if CMK_C_INLINE
47 inline 
48 #endif
49 loadmsg *getPool(void){
50   loadmsg *msg;
51   if (CpvAccess(msgpool)!=NULL)  {
52     msg = CpvAccess(msgpool);
53     CpvAccess(msgpool) = msg->next;
54   }
55   else {
56     msg = (loadmsg *)CmiAlloc(sizeof(loadmsg));
57     CmiSetHandler(msg, CpvAccess(CldLoadResponseHandlerIndex));
58   }
59   return msg;
60 }
61
62 static
63 #if CMK_C_INLINE
64 inline 
65 #endif
66 void putPool(loadmsg *msg)
67 {
68   msg->next = CpvAccess(msgpool);
69   CpvAccess(msgpool) = msg;
70 }
71
72 #endif
73
74 void LoadNotifyFn(int l)
75 {
76   CldProcInfo  cldData = CpvAccess(CldData);
77   cldData->sent = 0;
78 }
79
80 const char *CldGetStrategy(void)
81 {
82   return "neighbor";
83 }
84
85 /* since I am idle, ask for work from neighbors */
86 static void CldBeginIdle(void *dummy)
87 {
88   CpvAccess(CldData)->lastCheck = CmiWallTimer();
89 }
90
91 static void CldEndIdle(void *dummy)
92 {
93   CpvAccess(CldData)->lastCheck = -1;
94 }
95
96 static void CldStillIdle(void *dummy, double curT)
97 {
98   int i;
99   double startT;
100   requestmsg msg;
101   int myload;
102   CldProcInfo  cldData = CpvAccess(CldData);
103
104   double now = curT;
105   double lt = cldData->lastCheck;
106   /* only ask for work every 20ms */
107   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
108   cldData->lastCheck = now;
109
110   myload = CldCountTokens();
111   if (myload > 0) return;
112
113   msg.from_pe = CmiMyPe();
114   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
115 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
116   /* fixme */
117   CmiBecomeImmediate(&msg);
118   for (i=0; i<CpvAccess(numNeighbors); i++) {
119     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
120     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
121   }
122 #else
123   msg.to_rank = -1;
124   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
125 #endif
126   cldData->sent = 1;
127
128 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
129   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
130 #endif
131 }
132
133 /* immediate message handler, work at node level */
134 /* send some work to requested proc */
135 static void CldAskLoadHandler(requestmsg *msg)
136 {
137   int receiver, rank, recvIdx, i;
138   int myload = CldCountTokens();
139   double now = CmiWallTimer();
140
141   /* only give you work if I have more than 1 */
142   if (myload>0) {
143     int sendLoad;
144     receiver = msg->from_pe;
145     rank = CmiMyRank();
146     if (msg->to_rank != -1) rank = msg->to_rank;
147 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
148     /* try the lock */
149     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
150       CmiDelayImmediate();              /* postpone immediate message */
151       return;
152     }
153     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
154 #endif
155     sendLoad = myload / CpvAccess(numNeighbors) / 2;
156     if (sendLoad < 1) sendLoad = 1;
157     sendLoad = 1;
158     for (i=0; i<CpvAccess(numNeighbors); i++) 
159       if (CpvAccess(neighbors)[i].pe == receiver) break;
160     
161     if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;}   /* ? */
162     CpvAccess(neighbors)[i].load += sendLoad;
163     CldMultipleSend(receiver, sendLoad, rank, 0);
164 #if 0
165 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
166     /* this is dangerous since projections logging is not thread safe */
167     {
168     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
169     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
170     }
171 #endif
172 #endif
173   }
174   CmiFree(msg);
175 }
176
177 /* balancing by exchanging load among neighbors */
178
179 void CldSendLoad(void)
180 {
181 #if CMK_MULTICORE
182   /* directly send load to neighbors */
183   double myload = CldCountTokens();
184   int nNeighbors = CpvAccess(numNeighbors);
185   int i;
186   for (i=0; i<nNeighbors; i++) {
187     int neighbor_pe = CpvAccess(neighbors)[i].pe;
188     int j, found=0;
189     for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
190       if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
191       {
192         CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
193         found = 1;
194         break;
195       }
196   }
197 #else
198 #if USE_MULTICAST
199   loadmsg msg;
200
201   msg.pe = CmiMyPe();
202   msg.load = CldCountTokens();
203   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
204   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
205   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
206 #else
207   int i;
208   int mype = CmiMyPe();
209   int myload = CldCountTokens();
210   for(i=0; i<CpvAccess(numNeighbors); i++) {
211     loadmsg *msg = getPool();
212     msg->fromindex = i;
213     msg->toindex = CpvAccess(neighbors)[i].index;
214     msg->pe = mype;
215     msg->load = myload;
216     CmiSyncSendAndFree(CpvAccess(neighbors)[i].pe, sizeof(loadmsg), msg);
217   }
218 #endif
219 #endif
220 }
221
222 int CldMinAvg(void)
223 {
224   int sum=0, i;
225   int myload;
226
227   int nNeighbors = CpvAccess(numNeighbors);
228   if (CpvAccess(start) == -1)
229     CpvAccess(start) = CmiMyPe() % nNeighbors;
230
231 #if 0
232     /* update load from neighbors for multicore */
233   for (i=0; i<nNeighbors; i++) {
234     CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
235   }
236 #endif
237   CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
238   CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
239   sum = CpvAccess(neighbors)[CpvAccess(start)].load;
240   CpvAccess(Mindex) = CpvAccess(start);
241   for (i=1; i<nNeighbors; i++) {
242     CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
243     sum += CpvAccess(neighbors)[CpvAccess(start)].load;
244     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
245       CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
246       CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
247       CpvAccess(Mindex) = CpvAccess(start);
248     }
249   }
250   CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
251   myload = CldCountTokens();
252   sum += myload;
253   if (myload < CpvAccess(MinLoad)) {
254     CpvAccess(MinLoad) = myload;
255     CpvAccess(MinProc) = CmiMyPe();
256   }
257   i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
258   return i;
259 }
260
261 void CldBalance(void *dummy, double curT)
262 {
263   int i, j, overload, numToMove=0, avgLoad;
264   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
265
266 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
267   double startT = curT;
268 #endif
269
270 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
271   avgLoad = CldMinAvg();
272 /*
273   overload = CldLoad() - avgLoad;
274   if (overload > CldCountTokens())
275     overload = CldCountTokens();
276 */
277   overload = CldCountTokens() - avgLoad;
278
279   if (overload > overload_threshold) {
280     int nNeighbors = CpvAccess(numNeighbors);
281     for (i=0; i<nNeighbors; i++)
282       if (CpvAccess(neighbors)[i].load < avgLoad) {
283         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
284         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
285           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
286         numUnderAvg++;
287       }
288     if (numUnderAvg > 0)  {
289       int myrank = CmiMyRank();
290       for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
291         j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
292         if (CpvAccess(neighbors)[j].load < avgLoad) {
293           numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
294           if (numToMove > overload)
295               numToMove = overload;
296           overload -= numToMove;
297           CpvAccess(neighbors)[j].load += numToMove;
298 #if CMK_MULTICORE || CMK_USE_IBVERBS
299           CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove, myrank);
300 #else
301           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
302                           numToMove, myrank, 
303 #if CMK_SMP
304                           0
305 #else
306                           1
307 #endif
308                           );
309 #endif
310         }
311       }
312     }             /* end of numUnderAvg > 0 */
313   }
314   CldSendLoad();
315 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
316   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
317 #endif
318 }
319
320 void CldBalancePeriod(void *dummy, double curT)
321 {
322     CldBalance(NULL, curT);
323     CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, LBPeriod, CmiMyPe());
324 }
325
326
327 void CldLoadResponseHandler(loadmsg *msg)
328 {
329   int i;
330   
331 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
332   double startT = CmiWallTimer();
333 #endif
334 #if USE_MULTICAST
335   for(i=0; i<CpvAccess(numNeighbors); i++)
336     if (CpvAccess(neighbors)[i].pe == msg->pe) {
337       CpvAccess(neighbors)[i].load = msg->load;
338       break;
339     }
340   CmiFree(msg);
341 #else
342   int index = msg->toindex;
343   if (index == -1) {
344     for(i=0; i<CpvAccess(numNeighbors); i++)
345       if (CpvAccess(neighbors)[i].pe == msg->pe) {
346         index = i;
347         break;
348       }
349   }
350   if (index != -1) {    /* index can be -1, if neighbors table not init yet */
351     CpvAccess(neighbors)[index].load = msg->load;
352     if (CpvAccess(neighbors)[index].index == -1) CpvAccess(neighbors)[index].index = msg->fromindex;
353   }
354   putPool(msg);
355 #endif
356 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
357   traceUserBracketEvent(CpvAccess(CldData)->updateLoadEvt, startT, CmiWallTimer());
358 #endif
359 }
360
361 void CldBalanceHandler(void *msg)
362 {
363   CldRestoreHandler((char *)msg);
364   CldPutToken((char *)msg);
365 }
366
367 void CldHandler(void *msg)
368 {
369   CldInfoFn ifn; CldPackFn pfn;
370   int len, queueing, priobits; unsigned int *prioptr;
371   
372   CldRestoreHandler((char *)msg);
373   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
374   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
375   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
376   /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
377 }
378
379 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
380 {
381   int len, queueing, priobits,i; unsigned int *prioptr;
382   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
383   CldPackFn pfn;
384   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
385   if (pfn) {
386     pfn(&msg);
387     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
388   }
389   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
390   CmiSetInfo(msg,infofn);
391
392   CmiSyncMulticastAndFree(grp, len, msg);
393 }
394
395 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
396 {
397   int len, queueing, priobits,i; unsigned int *prioptr;
398   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
399   CldPackFn pfn;
400   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
401   if (pfn) {
402     pfn(&msg);
403     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
404   }
405   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
406   CmiSetInfo(msg,infofn);
407   /*
408   for(i=0;i<npes;i++) {
409     CmiSyncSend(pes[i], len, msg);
410   }
411   CmiFree(msg);
412   */
413   CmiSyncListSendAndFree(npes, pes, len, msg);
414 }
415
416 void CldEnqueue(int pe, void *msg, int infofn)
417 {
418   int len, queueing, priobits, avg; unsigned int *prioptr;
419   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
420   CldPackFn pfn;
421
422   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
423     avg = CldMinAvg();
424     if (CldCountTokens() < avg)
425       pe = CmiMyPe();
426     else
427       pe = CpvAccess(MinProc);
428 #if CMK_NODE_QUEUE_AVAILABLE
429     if (CmiNodeOf(pe) == CmiMyNode()) {
430       CldNodeEnqueue(CmiMyNode(), msg, infofn);
431       return;
432     }
433 #endif
434     /* always pack the message because the message may be move away
435        to a different processor later by CldGetToken() */
436     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
437     if (pfn && CmiNumNodes()>1) {
438        pfn(&msg);
439        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
440     }
441     if (pe != CmiMyPe()) {
442       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
443       CpvAccess(CldRelocatedMessages)++;
444       CmiSetInfo(msg,infofn);
445       CldSwitchHandler((char *)msg, CpvAccess(CldBalanceHandlerIndex));
446       CmiSyncSendAndFree(pe, len, msg);
447     }
448     else {
449       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
450       CmiSetInfo(msg,infofn);
451       CldPutToken((char *)msg);
452     }
453   } 
454   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
455     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
456     /*CmiSetInfo(msg,infofn);*/
457     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
458     /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
459   }
460   else {
461     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
462     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
463       pfn(&msg);
464       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
465     }
466     CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
467     CmiSetInfo(msg,infofn);
468     if (pe==CLD_BROADCAST) 
469       CmiSyncBroadcastAndFree(len, msg);
470     else if (pe==CLD_BROADCAST_ALL)
471       CmiSyncBroadcastAllAndFree(len, msg);
472     else CmiSyncSendAndFree(pe, len, msg);
473   }
474 }
475
476 void CldNodeEnqueue(int node, void *msg, int infofn)
477 {
478   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
479   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
480   CldPackFn pfn;
481   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
482     avg = CldMinAvg();
483     if (CldCountTokens() < avg)
484       pe = CmiMyPe();
485     else
486       pe = CpvAccess(MinProc);
487     node = CmiNodeOf(pe);
488     if (node != CmiMyNode()){
489         CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
490         CpvAccess(CldRelocatedMessages)++;
491         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
492         if (pfn) {
493             pfn(&msg);
494             ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
495         }
496         CmiSetInfo(msg,infofn);
497         CldSwitchHandler((char *)msg, CpvAccess(CldBalanceHandlerIndex));
498         CmiSyncNodeSendAndFree(node, len, msg);
499     }
500     else {
501       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
502       /* CmiSetInfo(msg,infofn);
503        CldPutToken((char *)msg); */
504       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
505     }
506   }
507   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
508     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
509 //    CmiSetInfo(msg,infofn);
510     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
511   } 
512   else {
513     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
514     if (pfn) {
515       pfn(&msg);
516       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
517     }
518     CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
519     CmiSetInfo(msg,infofn);
520     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
521     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
522     else CmiSyncNodeSendAndFree(node, len, msg);
523   }
524 }
525
526 void CldReadNeighborData(void)
527 {
528   FILE *fp;
529   char filename[25];
530   int i, *pes;
531   
532   if (CmiNumPes() <= 1)
533     return;
534   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
535   if ((fp = fopen(filename, "r")) == 0) 
536     {
537       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
538       return;
539     }
540   if (fscanf(fp, "%d", &CpvAccess(numNeighbors)) != 1) {
541     CmiAbort("CLD> reading neighbor data failed!");
542   }
543   CpvAccess(neighbors) = 
544     (struct CldNeighborData_s *)calloc(CpvAccess(numNeighbors),
545                                      sizeof(struct CldNeighborData_s));
546   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
547   for (i=0; i<CpvAccess(numNeighbors); i++) {
548     if (fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe)) != 1) {
549       CmiAbort("CLD> reading neighbor data failed!");
550     }
551     pes[i] = CpvAccess(neighbors)[i].pe;
552     CpvAccess(neighbors)[i].load = 0;
553   }
554   fclose(fp);
555   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
556 }
557
558 static void CldComputeNeighborData(void)
559 {
560   int i, npes;
561   int *pes;
562   LBtopoFn topofn;
563   void *topo;
564
565   topofn = LBTopoLookup(_lbtopo);
566   if (topofn == NULL) {
567     char str[1024];
568     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
569     printoutTopo();
570     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
571     CmiAbort(str);
572   }
573   topo = topofn(CmiNumPes());
574   npes = getTopoMaxNeighbors(topo);
575   pes = (int *)malloc(npes*sizeof(int));
576   getTopoNeighbors(topo, CmiMyPe(), pes, &npes);
577 #if 0
578   {
579   char buf[512], *ptr;
580   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npes);
581   ptr = buf + strlen(buf);
582   for (i=0; i<npes; i++) {
583     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
584     sprintf(ptr, " %d ", pes[i]);
585     ptr += strlen(ptr);
586   }
587   strcat(ptr, "\n");
588   CmiPrintf(buf);
589   }
590 #endif
591
592   CpvAccess(numNeighbors) = npes;
593   CpvAccess(neighbors) = 
594     (struct CldNeighborData_s *)calloc(npes, sizeof(struct CldNeighborData_s));
595   for (i=0; i<npes; i++) {
596     CpvAccess(neighbors)[i].pe = pes[i];
597     CpvAccess(neighbors)[i].load = 0;
598 #if ! USE_MULTICAST
599     CpvAccess(neighbors)[i].index = -1;
600 #endif
601   }
602   CpvAccess(neighborGroup) = CmiEstablishGroup(npes, pes);
603   free(pes);
604 }
605
606 static void topo_callback(void)
607 {
608   CldComputeNeighborData();
609 #if CMK_MULTICORE
610   CmiNodeBarrier();
611 #endif
612   CldBalancePeriod(NULL, CmiWallTimer());
613 }
614
615 void CldGraphModuleInit(char **argv)
616 {
617   CpvInitialize(CldProcInfo, CldData);
618   CpvInitialize(int, numNeighbors);
619   CpvInitialize(int, MinLoad);
620   CpvInitialize(int, Mindex);
621   CpvInitialize(int, MinProc);
622   CpvInitialize(int, start);
623   CpvInitialize(CmiGroup, neighborGroup);
624   CpvInitialize(CldNeighborData, neighbors);
625   CpvInitialize(int, CldBalanceHandlerIndex);
626   CpvInitialize(int, CldLoadResponseHandlerIndex);
627   CpvInitialize(int, CldAskLoadHandlerIndex);
628
629   CpvAccess(start) = -1;
630   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
631   CpvAccess(CldData)->lastCheck = -1;
632   CpvAccess(CldData)->sent = 0;
633 #if CMK_TRACE_ENABLED
634   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
635   CpvAccess(CldData)->updateLoadEvt = traceRegisterUserEvent("UpdateLoad", -1);
636   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
637   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
638 #endif
639
640   CpvAccess(MinLoad) = 0;
641   CpvAccess(Mindex) = 0;
642   CpvAccess(MinProc) = CmiMyPe();
643   CpvAccess(CldBalanceHandlerIndex) = 
644     CmiRegisterHandler(CldBalanceHandler);
645   CpvAccess(CldLoadResponseHandlerIndex) = 
646     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
647   CpvAccess(CldAskLoadHandlerIndex) = 
648     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
649
650   /* communication thread */
651   if (CmiMyRank() == CmiMyNodeSize())  return;
652
653   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
654   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
655
656   if (CmiNumPes() > 1) {
657 #if 0
658     FILE *fp;
659     char filename[20];
660   
661     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
662     if ((fp = fopen(filename, "r")) == 0)
663       {
664         if (CmiMyPe() == 0) {
665           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
666           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
667           CmiPrintf("done.\n");
668         }
669         else {
670           while (!(fp = fopen(filename, "r"))) ;
671           fclose(fp);
672         }
673       }
674     else fclose(fp);
675     CldReadNeighborData();
676 #endif
677 /* 
678     CldComputeNeighborData();
679 #if CMK_MULTICORE
680     CmiNodeBarrier();
681 #endif
682     CldBalancePeriod(NULL, CmiWallTimer());
683 */
684     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)topo_callback, NULL);
685
686   }
687
688   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_period", &LBPeriod, "time interval to do neighbor seed lb")) {
689     CmiAssert(LBPeriod>0);
690     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod);
691   }
692   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_overload", &overload_threshold, "neighbor seed lb's overload threshold")) {
693     CmiAssert(overload_threshold>0);
694     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold);
695   }
696
697 #if 1
698   _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
699   if (_lbsteal) {
700   /* register idle handlers - when idle, keep asking work from neighbors */
701   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
702       (CcdVoidFn) CldBeginIdle, NULL);
703   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
704       (CcdVoidFn) CldStillIdle, NULL);
705     if (CmiMyPe() == 0) 
706       CmiPrintf("Charm++> Work stealing is enabled. \n");
707   }
708 #endif
709 }
710
711
712 void CldModuleInit(char **argv)
713 {
714   CpvInitialize(int, CldHandlerIndex);
715   CpvInitialize(int, CldRelocatedMessages);
716   CpvInitialize(int, CldLoadBalanceMessages);
717   CpvInitialize(int, CldMessageChunks);
718   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
719   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
720   CpvAccess(CldMessageChunks) = 0;
721
722   CpvInitialize(loadmsg *, msgpool);
723   CpvAccess(msgpool) = NULL;
724
725   CldModuleGeneralInit(argv);
726   CldGraphModuleInit(argv);
727
728   CpvAccess(CldLoadNotify) = 1;
729 }