Merge remote branch 'origin/charmrun' into charmrun
[charm.git] / src / conv-ldb / cldb.neighbor.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define IDLE_IMMEDIATE          1
10 #define TRACE_USEREVENTS        0
11
12 #define PERIOD 20                /* default: 30 */
13 #define MAXOVERLOAD 1
14
15 typedef struct CldProcInfo_s {
16   double lastCheck;
17   int    sent;                  /* flag to disable idle work request */
18   int    balanceEvt;            /* user event for balancing */
19   int    idleEvt;               /* user event for idle balancing */
20   int    idleprocEvt;           /* user event for processing idle req */
21 } *CldProcInfo;
22
23 extern char *_lbtopo;                   /* topology name string */
24 int _lbsteal = 0;                       /* work stealing flag */
25
26 void gengraph(int, int, int, int *, int *);
27
28 CpvStaticDeclare(CldProcInfo, CldData);
29 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, MinLoad);
32 CpvStaticDeclare(int, MinProc);
33 CpvStaticDeclare(int, Mindex);
34 CpvStaticDeclare(int, start);
35
36 void LoadNotifyFn(int l)
37 {
38   CldProcInfo  cldData = CpvAccess(CldData);
39   cldData->sent = 0;
40 }
41
42 char *CldGetStrategy(void)
43 {
44   return "neighbor";
45 }
46
47 /* since I am idle, ask for work from neighbors */
48 static void CldBeginIdle(void *dummy)
49 {
50   CpvAccess(CldData)->lastCheck = CmiWallTimer();
51 }
52
53 static void CldEndIdle(void *dummy)
54 {
55   CpvAccess(CldData)->lastCheck = -1;
56 }
57
58 static void CldStillIdle(void *dummy, double curT)
59 {
60   int i;
61   double startT;
62   requestmsg msg;
63   int myload;
64   CldProcInfo  cldData = CpvAccess(CldData);
65
66   double now = curT;
67   double lt = cldData->lastCheck;
68   /* only ask for work every 20ms */
69   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
70   cldData->lastCheck = now;
71
72   myload = CldLoad();
73   if (myload > 0) return;
74
75   msg.from_pe = CmiMyPe();
76   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
77 #if ! IDLE_IMMEDIATE
78   msg.to_rank = -1;
79   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
80 #else
81   /* fixme */
82   CmiBecomeImmediate(&msg);
83   for (i=0; i<CpvAccess(numNeighbors); i++) {
84     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
85     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
86   }
87 #endif
88   cldData->sent = 1;
89
90 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
91   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
92 #endif
93 }
94
95 /* immediate message handler, work at node level */
96 /* send some work to requested proc */
97 static void CldAskLoadHandler(requestmsg *msg)
98 {
99   int receiver, rank, recvIdx, i;
100   int myload = CldLoad();
101   double now = CmiWallTimer();
102
103   /* only give you work if I have more than 1 */
104   if (myload>0) {
105     int sendLoad;
106     receiver = msg->from_pe;
107     rank = CmiMyRank();
108     if (msg->to_rank != -1) rank = msg->to_rank;
109 #if IDLE_IMMEDIATE
110     /* try the lock */
111     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
112       CmiDelayImmediate();              /* postpone immediate message */
113       return;
114     }
115     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
116 #endif
117     sendLoad = myload / CpvAccess(numNeighbors) / 2;
118     if (sendLoad < 1) sendLoad = 1;
119     sendLoad = 1;
120     for (i=0; i<CpvAccess(numNeighbors); i++) 
121       if (CpvAccess(neighbors)[i].pe == receiver) break;
122     
123     if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;}
124     CpvAccess(neighbors)[i].load += sendLoad;
125     CldMultipleSend(receiver, sendLoad, rank, 0);
126 #if 0
127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
128     /* this is dangerous since projections logging is not thread safe */
129     {
130     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
131     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
132     }
133 #endif
134 #endif
135   }
136   CmiFree(msg);
137 }
138
139 /* balancing by exchanging load among neighbors */
140
141 void CldSendLoad()
142 {
143 #if CMK_MULTICORE
144   /* directly send load to neighbors */
145   double myload = CldLoad();
146   int nNeighbors = CpvAccess(numNeighbors);
147   int i;
148   for (i=0; i<nNeighbors; i++) {
149     int neighbor_pe = CpvAccess(neighbors)[i].pe;
150     int j, found=0;
151     for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
152       if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
153       {
154         CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
155         found = 1;
156         break;
157       }
158   }
159 #else
160   loadmsg msg;
161
162   msg.pe = CmiMyPe();
163   msg.load = CldLoad();
164   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
165   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
166   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
167 #endif
168 }
169
170 int CldMinAvg()
171 {
172   int sum=0, i;
173
174   int nNeighbors = CpvAccess(numNeighbors);
175   if (CpvAccess(start) == -1)
176     CpvAccess(start) = CmiMyPe() % nNeighbors;
177
178 #if 0
179     /* update load from neighbors for multicore */
180   for (i=0; i<nNeighbors; i++) {
181     CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
182   }
183 #endif
184   CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
185   CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
186   sum = CpvAccess(neighbors)[CpvAccess(start)].load;
187   CpvAccess(Mindex) = CpvAccess(start);
188   for (i=1; i<nNeighbors; i++) {
189     CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
190     sum += CpvAccess(neighbors)[CpvAccess(start)].load;
191     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
192       CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
193       CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
194       CpvAccess(Mindex) = CpvAccess(start);
195     }
196   }
197   CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
198   sum += CldLoad();
199   if (CldLoad() < CpvAccess(MinLoad)) {
200     CpvAccess(MinLoad) = CldLoad();
201     CpvAccess(MinProc) = CmiMyPe();
202   }
203   i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
204   return i;
205 }
206
207 void CldBalance(void *dummy, double curT)
208 {
209   int i, j, overload, numToMove=0, avgLoad;
210   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
211
212 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
213   double startT = curT;
214 #endif
215
216 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
217   avgLoad = CldMinAvg();
218   overload = CldLoad() - avgLoad;
219   if (overload > CldCountTokens())
220     overload = CldCountTokens();
221
222   if (overload > MAXOVERLOAD) {
223     int nNeighbors = CpvAccess(numNeighbors);
224     for (i=0; i<nNeighbors; i++)
225       if (CpvAccess(neighbors)[i].load < avgLoad) {
226         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
227         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
228           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
229         numUnderAvg++;
230       }
231     if (numUnderAvg > 0)
232       for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
233           j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
234           if (CpvAccess(neighbors)[j].load < avgLoad) {
235               numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
236           if (numToMove > overload)
237             numToMove = overload;
238           overload -= numToMove;
239           CpvAccess(neighbors)[j].load += numToMove;
240 #if CMK_MULTICORE
241           CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
242 #else
243           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
244                           numToMove, CmiMyRank(), 
245 #if CMK_SMP
246                           0
247 #else
248                           1
249 #endif
250                           );
251 #endif
252         }
253       }
254   }
255   CldSendLoad();
256 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
257   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
258 #endif
259 }
260
261 void CldBalancePeriod(void *dummy, double curT)
262 {
263     CldBalance(NULL, curT);
264     CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, PERIOD, CmiMyPe());
265 }
266
267
268 void CldLoadResponseHandler(loadmsg *msg)
269 {
270   int i;
271
272   for(i=0; i<CpvAccess(numNeighbors); i++)
273     if (CpvAccess(neighbors)[i].pe == msg->pe) {
274       CpvAccess(neighbors)[i].load = msg->load;
275       break;
276     }
277   CmiFree(msg);
278 }
279
280 void CldBalanceHandler(void *msg)
281 {
282   CldRestoreHandler(msg);
283   CldPutToken(msg);
284 }
285
286 void CldHandler(void *msg)
287 {
288   CldInfoFn ifn; CldPackFn pfn;
289   int len, queueing, priobits; unsigned int *prioptr;
290   
291   CldRestoreHandler(msg);
292   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
293   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
294   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
295   /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
296 }
297
298 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
299 {
300   int len, queueing, priobits,i; unsigned int *prioptr;
301   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
302   CldPackFn pfn;
303   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
304   if (pfn) {
305     pfn(&msg);
306     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
307   }
308   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
309   CmiSetInfo(msg,infofn);
310
311   CmiSyncMulticastAndFree(grp, len, msg);
312 }
313
314 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
315 {
316   int len, queueing, priobits,i; unsigned int *prioptr;
317   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
318   CldPackFn pfn;
319   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
320   if (pfn) {
321     pfn(&msg);
322     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
323   }
324   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
325   CmiSetInfo(msg,infofn);
326   /*
327   for(i=0;i<npes;i++) {
328     CmiSyncSend(pes[i], len, msg);
329   }
330   CmiFree(msg);
331   */
332   CmiSyncListSendAndFree(npes, pes, len, msg);
333 }
334
335 void CldEnqueue(int pe, void *msg, int infofn)
336 {
337   int len, queueing, priobits, avg; unsigned int *prioptr;
338   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
339   CldPackFn pfn;
340
341   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
342     avg = CldMinAvg();
343     if (CldLoad() < avg)
344       pe = CmiMyPe();
345     else
346       pe = CpvAccess(MinProc);
347 #if CMK_NODE_QUEUE_AVAILABLE
348     if (CmiNodeOf(pe) == CmiMyNode()) {
349       CldNodeEnqueue(CmiMyNode(), msg, infofn);
350       return;
351     }
352 #endif
353     /* always pack the message because the message may be move away
354        to a different processor later by CldGetToken() */
355     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
356     if (pfn && CmiNumNodes()>1) {
357        pfn(&msg);
358        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
359     }
360     if (pe != CmiMyPe()) {
361       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
362       CpvAccess(CldRelocatedMessages)++;
363       CmiSetInfo(msg,infofn);
364       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
365       CmiSyncSendAndFree(pe, len, msg);
366     }
367     else {
368       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
369       CmiSetInfo(msg,infofn);
370       CldPutToken(msg);
371     }
372   } 
373   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
374     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
375     /*CmiSetInfo(msg,infofn);*/
376     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
377     /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
378   }
379   else {
380     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
381     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
382       pfn(&msg);
383       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
384     }
385     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
386     CmiSetInfo(msg,infofn);
387     if (pe==CLD_BROADCAST) 
388       CmiSyncBroadcastAndFree(len, msg);
389     else if (pe==CLD_BROADCAST_ALL)
390       CmiSyncBroadcastAllAndFree(len, msg);
391     else CmiSyncSendAndFree(pe, len, msg);
392   }
393 }
394
395 void CldNodeEnqueue(int node, void *msg, int infofn)
396 {
397   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
398   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
399   CldPackFn pfn;
400   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
401     avg = CldMinAvg();
402     if (CldLoad() < avg)
403       pe = CmiMyPe();
404     else
405       pe = CpvAccess(MinProc);
406     node = CmiNodeOf(pe);
407     if (node != CmiMyNode()){
408         CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
409         CpvAccess(CldRelocatedMessages)++;
410         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
411         if (pfn) {
412             pfn(&msg);
413             ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
414         }
415         CmiSetInfo(msg,infofn);
416         CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
417         CmiSyncNodeSendAndFree(node, len, msg);
418     }
419     else {
420       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
421       /* CmiSetInfo(msg,infofn);
422        CldPutToken(msg); */
423       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
424     }
425   }
426   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
427     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
428 //    CmiSetInfo(msg,infofn);
429     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
430   } 
431   else {
432     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
433     if (pfn) {
434       pfn(&msg);
435       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
436     }
437     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
438     CmiSetInfo(msg,infofn);
439     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
440     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
441     else CmiSyncNodeSendAndFree(node, len, msg);
442   }
443 }
444
445 void CldReadNeighborData()
446 {
447   FILE *fp;
448   char filename[25];
449   int i, *pes;
450   
451   if (CmiNumPes() <= 1)
452     return;
453   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
454   if ((fp = fopen(filename, "r")) == 0) 
455     {
456       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
457       return;
458     }
459   fscanf(fp, "%d", &CpvAccess(numNeighbors));
460   CpvAccess(neighbors) = 
461     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
462                                      sizeof(struct CldNeighborData));
463   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
464   for (i=0; i<CpvAccess(numNeighbors); i++) {
465     fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
466     pes[i] = CpvAccess(neighbors)[i].pe;
467     CpvAccess(neighbors)[i].load = 0;
468   }
469   fclose(fp);
470   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
471 }
472
473 static void CldComputeNeighborData()
474 {
475   int i, npe;
476   int *pes;
477   LBtopoFn topofn;
478   void *topo;
479
480   topofn = LBTopoLookup(_lbtopo);
481   if (topofn == NULL) {
482     char str[1024];
483     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
484     printoutTopo();
485     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
486     CmiAbort(str);
487   }
488   topo = topofn(CmiNumPes());
489   npe = getTopoMaxNeighbors(topo);
490   pes = (int *)malloc(npe*sizeof(int));
491   getTopoNeighbors(topo, CmiMyPe(), pes, &npe);
492 #if 0
493   {
494   char buf[512], *ptr;
495   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npe);
496   ptr = buf + strlen(buf);
497   for (i=0; i<npe; i++) {
498     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
499     sprintf(ptr, " %d ", pes[i]);
500     ptr += strlen(ptr);
501   }
502   strcat(ptr, "\n");
503   CmiPrintf(buf);
504   }
505 #endif
506
507   CpvAccess(numNeighbors) = npe;
508   CpvAccess(neighbors) = 
509     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
510                                      sizeof(struct CldNeighborData));
511   for (i=0; i<CpvAccess(numNeighbors); i++) {
512     CpvAccess(neighbors)[i].pe = pes[i];
513     CpvAccess(neighbors)[i].load = 0;
514   }
515   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
516   free(pes);
517 }
518
519 void CldGraphModuleInit(char **argv)
520 {
521   CpvInitialize(CldProcInfo, CldData);
522   CpvInitialize(int, numNeighbors);
523   CpvInitialize(int, MinLoad);
524   CpvInitialize(int, Mindex);
525   CpvInitialize(int, MinProc);
526   CpvInitialize(int, start);
527   CpvInitialize(CmiGroup, neighborGroup);
528   CpvInitialize(CldNeighborData, neighbors);
529   CpvInitialize(int, CldBalanceHandlerIndex);
530   CpvInitialize(int, CldLoadResponseHandlerIndex);
531   CpvInitialize(int, CldAskLoadHandlerIndex);
532
533   CpvAccess(start) = -1;
534   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
535   CpvAccess(CldData)->lastCheck = -1;
536   CpvAccess(CldData)->sent = 0;
537 #if CMK_TRACE_ENABLED
538   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
539   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
540   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
541 #endif
542
543   CpvAccess(MinLoad) = 0;
544   CpvAccess(Mindex) = 0;
545   CpvAccess(MinProc) = CmiMyPe();
546   CpvAccess(CldBalanceHandlerIndex) = 
547     CmiRegisterHandler(CldBalanceHandler);
548   CpvAccess(CldLoadResponseHandlerIndex) = 
549     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
550   CpvAccess(CldAskLoadHandlerIndex) = 
551     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
552
553   /* communication thread */
554   if (CmiMyRank() == CmiMyNodeSize())  return;
555
556   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
557   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
558
559   if (CmiNumPes() > 1) {
560 #if 0
561     FILE *fp;
562     char filename[20];
563   
564     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
565     if ((fp = fopen(filename, "r")) == 0)
566       {
567         if (CmiMyPe() == 0) {
568           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
569           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
570           CmiPrintf("done.\n");
571         }
572         else {
573           while (!(fp = fopen(filename, "r"))) ;
574           fclose(fp);
575         }
576       }
577     else fclose(fp);
578     CldReadNeighborData();
579 #endif
580     CldComputeNeighborData();
581     #if CMK_MULTICORE
582     CmiNodeBarrier();
583 #endif
584     CldBalancePeriod(NULL, CmiWallTimer());
585
586   }
587
588 #if 1
589   _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
590   if (_lbsteal) {
591   /* register idle handlers - when idle, keep asking work from neighbors */
592   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
593       (CcdVoidFn) CldStillIdle, NULL);
594   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
595       (CcdVoidFn) CldStillIdle, NULL);
596     if (CmiMyPe() == 0) 
597       CmiPrintf("Charm++> Work stealing is enabled. \n");
598   }
599 #endif
600 }
601
602
603 void CldModuleInit(char **argv)
604 {
605   CpvInitialize(int, CldHandlerIndex);
606   CpvInitialize(int, CldRelocatedMessages);
607   CpvInitialize(int, CldLoadBalanceMessages);
608   CpvInitialize(int, CldMessageChunks);
609   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
610   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
611   CpvAccess(CldMessageChunks) = 0;
612
613   CldModuleGeneralInit(argv);
614   CldGraphModuleInit(argv);
615
616   CpvAccess(CldLoadNotify) = 1;
617 }