add phase change notification
[charm.git] / src / conv-ldb / cldb.neighbor.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define USE_MULTICAST           0
10 #define IDLE_IMMEDIATE          1
11 #define TRACE_USEREVENTS        1
12
13 #define PERIOD 20                /* default: 30 */
14 #define MAXOVERLOAD 1
15
16 static int  LBPeriod = PERIOD;                 /* time to call load balancing */
17 static int  overload_threshold = MAXOVERLOAD;
18
19 typedef struct CldProcInfo_s {
20   double lastCheck;
21   int    sent;                  /* flag to disable idle work request */
22   int    balanceEvt;            /* user event for balancing */
23   int    updateLoadEvt; 
24   int    idleEvt;               /* user event for idle balancing */
25   int    idleprocEvt;           /* user event for processing idle req */
26 } *CldProcInfo;
27
28 extern char *_lbtopo;                   /* topology name string */
29 int _lbsteal = 0;                       /* work stealing flag */
30
31 void gengraph(int, int, int, int *, int *);
32
33 CpvStaticDeclare(CldProcInfo, CldData);
34 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
35 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
36 CpvStaticDeclare(int, MinLoad);
37 CpvStaticDeclare(int, MinProc);
38 CpvStaticDeclare(int, Mindex);
39 CpvStaticDeclare(int, start);
40
41 #if ! USE_MULTICAST
42 CpvStaticDeclare(loadmsg *, msgpool);
43
44 static
45 #if CMK_C_INLINE
46 inline 
47 #endif
48 loadmsg *getPool(){
49   loadmsg *msg;
50   if (CpvAccess(msgpool)!=NULL)  {
51     msg = CpvAccess(msgpool);
52     CpvAccess(msgpool) = msg->next;
53   }
54   else {
55     msg = CmiAlloc(sizeof(loadmsg));
56     CmiSetHandler(msg, CpvAccess(CldLoadResponseHandlerIndex));
57   }
58   return msg;
59 }
60
61 static
62 #if CMK_C_INLINE
63 inline 
64 #endif
65 void putPool(loadmsg *msg)
66 {
67   msg->next = CpvAccess(msgpool);
68   CpvAccess(msgpool) = msg;
69 }
70
71 #endif
72
73 void LoadNotifyFn(int l)
74 {
75   CldProcInfo  cldData = CpvAccess(CldData);
76   cldData->sent = 0;
77 }
78
79 char *CldGetStrategy(void)
80 {
81   return "neighbor";
82 }
83
84 /* since I am idle, ask for work from neighbors */
85 static void CldBeginIdle(void *dummy)
86 {
87   CpvAccess(CldData)->lastCheck = CmiWallTimer();
88 }
89
90 static void CldEndIdle(void *dummy)
91 {
92   CpvAccess(CldData)->lastCheck = -1;
93 }
94
95 static void CldStillIdle(void *dummy, double curT)
96 {
97   int i;
98   double startT;
99   requestmsg msg;
100   int myload;
101   CldProcInfo  cldData = CpvAccess(CldData);
102
103   double now = curT;
104   double lt = cldData->lastCheck;
105   /* only ask for work every 20ms */
106   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
107   cldData->lastCheck = now;
108
109   myload = CldCountTokens();
110   if (myload > 0) return;
111
112   msg.from_pe = CmiMyPe();
113   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
114 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
115   /* fixme */
116   CmiBecomeImmediate(&msg);
117   for (i=0; i<CpvAccess(numNeighbors); i++) {
118     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
119     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
120   }
121 #else
122   msg.to_rank = -1;
123   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
124 #endif
125   cldData->sent = 1;
126
127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
128   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
129 #endif
130 }
131
132 /* immediate message handler, work at node level */
133 /* send some work to requested proc */
134 static void CldAskLoadHandler(requestmsg *msg)
135 {
136   int receiver, rank, recvIdx, i;
137   int myload = CldCountTokens();
138   double now = CmiWallTimer();
139
140   /* only give you work if I have more than 1 */
141   if (myload>0) {
142     int sendLoad;
143     receiver = msg->from_pe;
144     rank = CmiMyRank();
145     if (msg->to_rank != -1) rank = msg->to_rank;
146 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
147     /* try the lock */
148     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
149       CmiDelayImmediate();              /* postpone immediate message */
150       return;
151     }
152     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
153 #endif
154     sendLoad = myload / CpvAccess(numNeighbors) / 2;
155     if (sendLoad < 1) sendLoad = 1;
156     sendLoad = 1;
157     for (i=0; i<CpvAccess(numNeighbors); i++) 
158       if (CpvAccess(neighbors)[i].pe == receiver) break;
159     
160     if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;}   /* ? */
161     CpvAccess(neighbors)[i].load += sendLoad;
162     CldMultipleSend(receiver, sendLoad, rank, 0);
163 #if 0
164 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
165     /* this is dangerous since projections logging is not thread safe */
166     {
167     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
168     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
169     }
170 #endif
171 #endif
172   }
173   CmiFree(msg);
174 }
175
176 /* balancing by exchanging load among neighbors */
177
178 void CldSendLoad()
179 {
180 #if CMK_MULTICORE
181   /* directly send load to neighbors */
182   double myload = CldCountTokens();
183   int nNeighbors = CpvAccess(numNeighbors);
184   int i;
185   for (i=0; i<nNeighbors; i++) {
186     int neighbor_pe = CpvAccess(neighbors)[i].pe;
187     int j, found=0;
188     for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
189       if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
190       {
191         CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
192         found = 1;
193         break;
194       }
195   }
196 #else
197 #if USE_MULTICAST
198   loadmsg msg;
199
200   msg.pe = CmiMyPe();
201   msg.load = CldCountTokens();
202   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
203   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
204   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
205 #else
206   int i;
207   int mype = CmiMyPe();
208   int myload = CldCountTokens();
209   for(i=0; i<CpvAccess(numNeighbors); i++) {
210     loadmsg *msg = getPool();
211     msg->fromindex = i;
212     msg->toindex = CpvAccess(neighbors)[i].index;
213     msg->pe = mype;
214     msg->load = myload;
215     CmiSyncSendAndFree(CpvAccess(neighbors)[i].pe, sizeof(loadmsg), msg);
216   }
217 #endif
218 #endif
219 }
220
221 int CldMinAvg()
222 {
223   int sum=0, i;
224   int myload;
225
226   int nNeighbors = CpvAccess(numNeighbors);
227   if (CpvAccess(start) == -1)
228     CpvAccess(start) = CmiMyPe() % nNeighbors;
229
230 #if 0
231     /* update load from neighbors for multicore */
232   for (i=0; i<nNeighbors; i++) {
233     CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
234   }
235 #endif
236   CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
237   CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
238   sum = CpvAccess(neighbors)[CpvAccess(start)].load;
239   CpvAccess(Mindex) = CpvAccess(start);
240   for (i=1; i<nNeighbors; i++) {
241     CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
242     sum += CpvAccess(neighbors)[CpvAccess(start)].load;
243     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
244       CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
245       CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
246       CpvAccess(Mindex) = CpvAccess(start);
247     }
248   }
249   CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
250   myload = CldCountTokens();
251   sum += myload;
252   if (myload < CpvAccess(MinLoad)) {
253     CpvAccess(MinLoad) = myload;
254     CpvAccess(MinProc) = CmiMyPe();
255   }
256   i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
257   return i;
258 }
259
260 void CldBalance(void *dummy, double curT)
261 {
262   int i, j, overload, numToMove=0, avgLoad;
263   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
264
265 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
266   double startT = curT;
267 #endif
268
269 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
270   avgLoad = CldMinAvg();
271 /*
272   overload = CldLoad() - avgLoad;
273   if (overload > CldCountTokens())
274     overload = CldCountTokens();
275 */
276   overload = CldCountTokens() - avgLoad;
277
278   if (overload > overload_threshold) {
279     int nNeighbors = CpvAccess(numNeighbors);
280     for (i=0; i<nNeighbors; i++)
281       if (CpvAccess(neighbors)[i].load < avgLoad) {
282         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
283         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
284           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
285         numUnderAvg++;
286       }
287     if (numUnderAvg > 0)  {
288       int myrank = CmiMyRank();
289       for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
290         j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
291         if (CpvAccess(neighbors)[j].load < avgLoad) {
292           numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
293           if (numToMove > overload)
294               numToMove = overload;
295           overload -= numToMove;
296           CpvAccess(neighbors)[j].load += numToMove;
297 #if CMK_MULTICORE || CMK_USE_IBVERBS
298           CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove, myrank);
299 #else
300           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
301                           numToMove, myrank, 
302 #if CMK_SMP
303                           0
304 #else
305                           1
306 #endif
307                           );
308 #endif
309         }
310       }
311     }             /* end of numUnderAvg > 0 */
312   }
313   CldSendLoad();
314 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
315   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
316 #endif
317 }
318
319 void CldBalancePeriod(void *dummy, double curT)
320 {
321     CldBalance(NULL, curT);
322     CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, LBPeriod, CmiMyPe());
323 }
324
325
326 void CldLoadResponseHandler(loadmsg *msg)
327 {
328   int i;
329   
330 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
331   double startT = CmiWallTimer();
332 #endif
333 #if USE_MULTICAST
334   for(i=0; i<CpvAccess(numNeighbors); i++)
335     if (CpvAccess(neighbors)[i].pe == msg->pe) {
336       CpvAccess(neighbors)[i].load = msg->load;
337       break;
338     }
339   CmiFree(msg);
340 #else
341   int index = msg->toindex;
342   if (index == -1) {
343     for(i=0; i<CpvAccess(numNeighbors); i++)
344       if (CpvAccess(neighbors)[i].pe == msg->pe) {
345         index = i;
346         break;
347       }
348   }
349   if (index != -1) {    /* index can be -1, if neighbors table not init yet */
350     CpvAccess(neighbors)[index].load = msg->load;
351     if (CpvAccess(neighbors)[index].index == -1) CpvAccess(neighbors)[index].index = msg->fromindex;
352   }
353   putPool(msg);
354 #endif
355 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
356   traceUserBracketEvent(CpvAccess(CldData)->updateLoadEvt, startT, CmiWallTimer());
357 #endif
358 }
359
360 void CldBalanceHandler(void *msg)
361 {
362   CldRestoreHandler(msg);
363   CldPutToken(msg);
364 }
365
366 void CldHandler(void *msg)
367 {
368   CldInfoFn ifn; CldPackFn pfn;
369   int len, queueing, priobits; unsigned int *prioptr;
370   
371   CldRestoreHandler(msg);
372   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
373   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
374   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
375   /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
376 }
377
378 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
379 {
380   int len, queueing, priobits,i; unsigned int *prioptr;
381   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
382   CldPackFn pfn;
383   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
384   if (pfn) {
385     pfn(&msg);
386     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
387   }
388   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
389   CmiSetInfo(msg,infofn);
390
391   CmiSyncMulticastAndFree(grp, len, msg);
392 }
393
394 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
395 {
396   int len, queueing, priobits,i; unsigned int *prioptr;
397   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
398   CldPackFn pfn;
399   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
400   if (pfn) {
401     pfn(&msg);
402     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
403   }
404   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
405   CmiSetInfo(msg,infofn);
406   /*
407   for(i=0;i<npes;i++) {
408     CmiSyncSend(pes[i], len, msg);
409   }
410   CmiFree(msg);
411   */
412   CmiSyncListSendAndFree(npes, pes, len, msg);
413 }
414
415 void CldEnqueue(int pe, void *msg, int infofn)
416 {
417   int len, queueing, priobits, avg; unsigned int *prioptr;
418   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
419   CldPackFn pfn;
420
421   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
422     avg = CldMinAvg();
423     if (CldCountTokens() < avg)
424       pe = CmiMyPe();
425     else
426       pe = CpvAccess(MinProc);
427 #if CMK_NODE_QUEUE_AVAILABLE
428     if (CmiNodeOf(pe) == CmiMyNode()) {
429       CldNodeEnqueue(CmiMyNode(), msg, infofn);
430       return;
431     }
432 #endif
433     /* always pack the message because the message may be move away
434        to a different processor later by CldGetToken() */
435     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
436     if (pfn && CmiNumNodes()>1) {
437        pfn(&msg);
438        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
439     }
440     if (pe != CmiMyPe()) {
441       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
442       CpvAccess(CldRelocatedMessages)++;
443       CmiSetInfo(msg,infofn);
444       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
445       CmiSyncSendAndFree(pe, len, msg);
446     }
447     else {
448       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
449       CmiSetInfo(msg,infofn);
450       CldPutToken(msg);
451     }
452   } 
453   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
454     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
455     /*CmiSetInfo(msg,infofn);*/
456     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
457     /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
458   }
459   else {
460     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
461     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
462       pfn(&msg);
463       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
464     }
465     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
466     CmiSetInfo(msg,infofn);
467     if (pe==CLD_BROADCAST) 
468       CmiSyncBroadcastAndFree(len, msg);
469     else if (pe==CLD_BROADCAST_ALL)
470       CmiSyncBroadcastAllAndFree(len, msg);
471     else CmiSyncSendAndFree(pe, len, msg);
472   }
473 }
474
475 void CldNodeEnqueue(int node, void *msg, int infofn)
476 {
477   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
478   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
479   CldPackFn pfn;
480   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
481     avg = CldMinAvg();
482     if (CldCountTokens() < avg)
483       pe = CmiMyPe();
484     else
485       pe = CpvAccess(MinProc);
486     node = CmiNodeOf(pe);
487     if (node != CmiMyNode()){
488         CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
489         CpvAccess(CldRelocatedMessages)++;
490         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
491         if (pfn) {
492             pfn(&msg);
493             ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
494         }
495         CmiSetInfo(msg,infofn);
496         CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
497         CmiSyncNodeSendAndFree(node, len, msg);
498     }
499     else {
500       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
501       /* CmiSetInfo(msg,infofn);
502        CldPutToken(msg); */
503       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
504     }
505   }
506   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
507     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
508 //    CmiSetInfo(msg,infofn);
509     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
510   } 
511   else {
512     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
513     if (pfn) {
514       pfn(&msg);
515       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
516     }
517     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
518     CmiSetInfo(msg,infofn);
519     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
520     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
521     else CmiSyncNodeSendAndFree(node, len, msg);
522   }
523 }
524
525 void CldReadNeighborData()
526 {
527   FILE *fp;
528   char filename[25];
529   int i, *pes;
530   
531   if (CmiNumPes() <= 1)
532     return;
533   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
534   if ((fp = fopen(filename, "r")) == 0) 
535     {
536       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
537       return;
538     }
539   fscanf(fp, "%d", &CpvAccess(numNeighbors));
540   CpvAccess(neighbors) = 
541     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
542                                      sizeof(struct CldNeighborData));
543   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
544   for (i=0; i<CpvAccess(numNeighbors); i++) {
545     fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
546     pes[i] = CpvAccess(neighbors)[i].pe;
547     CpvAccess(neighbors)[i].load = 0;
548   }
549   fclose(fp);
550   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
551 }
552
553 static void CldComputeNeighborData()
554 {
555   int i, npes;
556   int *pes;
557   LBtopoFn topofn;
558   void *topo;
559
560   topofn = LBTopoLookup(_lbtopo);
561   if (topofn == NULL) {
562     char str[1024];
563     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
564     printoutTopo();
565     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
566     CmiAbort(str);
567   }
568   topo = topofn(CmiNumPes());
569   npes = getTopoMaxNeighbors(topo);
570   pes = (int *)malloc(npes*sizeof(int));
571   getTopoNeighbors(topo, CmiMyPe(), pes, &npes);
572 #if 0
573   {
574   char buf[512], *ptr;
575   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npes);
576   ptr = buf + strlen(buf);
577   for (i=0; i<npes; i++) {
578     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
579     sprintf(ptr, " %d ", pes[i]);
580     ptr += strlen(ptr);
581   }
582   strcat(ptr, "\n");
583   CmiPrintf(buf);
584   }
585 #endif
586
587   CpvAccess(numNeighbors) = npes;
588   CpvAccess(neighbors) = 
589     (struct CldNeighborData *)calloc(npes, sizeof(struct CldNeighborData));
590   for (i=0; i<npes; i++) {
591     CpvAccess(neighbors)[i].pe = pes[i];
592     CpvAccess(neighbors)[i].load = 0;
593 #if ! USE_MULTICAST
594     CpvAccess(neighbors)[i].index = -1;
595 #endif
596   }
597   CpvAccess(neighborGroup) = CmiEstablishGroup(npes, pes);
598   free(pes);
599 }
600
601 static void topo_callback()
602 {
603   CldComputeNeighborData();
604 #if CMK_MULTICORE
605   CmiNodeBarrier();
606 #endif
607   CldBalancePeriod(NULL, CmiWallTimer());
608 }
609
610 void CldGraphModuleInit(char **argv)
611 {
612   CpvInitialize(CldProcInfo, CldData);
613   CpvInitialize(int, numNeighbors);
614   CpvInitialize(int, MinLoad);
615   CpvInitialize(int, Mindex);
616   CpvInitialize(int, MinProc);
617   CpvInitialize(int, start);
618   CpvInitialize(CmiGroup, neighborGroup);
619   CpvInitialize(CldNeighborData, neighbors);
620   CpvInitialize(int, CldBalanceHandlerIndex);
621   CpvInitialize(int, CldLoadResponseHandlerIndex);
622   CpvInitialize(int, CldAskLoadHandlerIndex);
623
624   CpvAccess(start) = -1;
625   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
626   CpvAccess(CldData)->lastCheck = -1;
627   CpvAccess(CldData)->sent = 0;
628 #if CMK_TRACE_ENABLED
629   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
630   CpvAccess(CldData)->updateLoadEvt = traceRegisterUserEvent("UpdateLoad", -1);
631   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
632   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
633 #endif
634
635   CpvAccess(MinLoad) = 0;
636   CpvAccess(Mindex) = 0;
637   CpvAccess(MinProc) = CmiMyPe();
638   CpvAccess(CldBalanceHandlerIndex) = 
639     CmiRegisterHandler(CldBalanceHandler);
640   CpvAccess(CldLoadResponseHandlerIndex) = 
641     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
642   CpvAccess(CldAskLoadHandlerIndex) = 
643     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
644
645   /* communication thread */
646   if (CmiMyRank() == CmiMyNodeSize())  return;
647
648   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
649   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
650
651   if (CmiNumPes() > 1) {
652 #if 0
653     FILE *fp;
654     char filename[20];
655   
656     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
657     if ((fp = fopen(filename, "r")) == 0)
658       {
659         if (CmiMyPe() == 0) {
660           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
661           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
662           CmiPrintf("done.\n");
663         }
664         else {
665           while (!(fp = fopen(filename, "r"))) ;
666           fclose(fp);
667         }
668       }
669     else fclose(fp);
670     CldReadNeighborData();
671 #endif
672 /* 
673     CldComputeNeighborData();
674 #if CMK_MULTICORE
675     CmiNodeBarrier();
676 #endif
677     CldBalancePeriod(NULL, CmiWallTimer());
678 */
679     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)topo_callback, NULL);
680
681   }
682
683   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_period", &LBPeriod, "time interval to do neighbor seed lb")) {
684     CmiAssert(LBPeriod>0);
685     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod);
686   }
687   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_overload", &overload_threshold, "neighbor seed lb's overload threshold")) {
688     CmiAssert(overload_threshold>0);
689     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold);
690   }
691
692 #if 1
693   _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
694   if (_lbsteal) {
695   /* register idle handlers - when idle, keep asking work from neighbors */
696   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
697       (CcdVoidFn) CldBeginIdle, NULL);
698   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
699       (CcdVoidFn) CldStillIdle, NULL);
700     if (CmiMyPe() == 0) 
701       CmiPrintf("Charm++> Work stealing is enabled. \n");
702   }
703 #endif
704 }
705
706
707 void CldModuleInit(char **argv)
708 {
709   CpvInitialize(int, CldHandlerIndex);
710   CpvInitialize(int, CldRelocatedMessages);
711   CpvInitialize(int, CldLoadBalanceMessages);
712   CpvInitialize(int, CldMessageChunks);
713   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
714   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
715   CpvAccess(CldMessageChunks) = 0;
716
717   CpvInitialize(loadmsg *, msgpool);
718   CpvAccess(msgpool) = NULL;
719
720   CldModuleGeneralInit(argv);
721   CldGraphModuleInit(argv);
722
723   CpvAccess(CldLoadNotify) = 1;
724 }