changed for immediate msg
[charm.git] / src / conv-ldb / cldb.neighbor.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define USE_MULTICAST           0
10 #define IDLE_IMMEDIATE          1
11 #define TRACE_USEREVENTS        1
12
13 #define PERIOD 20                /* default: 30 */
14 #define MAXOVERLOAD 1
15
16 static int  LBPeriod = PERIOD;                 /* time to call load balancing */
17 static int  overload_threshold = MAXOVERLOAD;
18
19 typedef struct CldProcInfo_s {
20   double lastCheck;
21   int    sent;                  /* flag to disable idle work request */
22   int    balanceEvt;            /* user event for balancing */
23   int    updateLoadEvt; 
24   int    idleEvt;               /* user event for idle balancing */
25   int    idleprocEvt;           /* user event for processing idle req */
26 } *CldProcInfo;
27
28 extern char *_lbtopo;                   /* topology name string */
29 int _lbsteal = 0;                       /* work stealing flag */
30
31 void gengraph(int, int, int, int *, int *);
32
33 CpvStaticDeclare(CldProcInfo, CldData);
34 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
35 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
36 CpvStaticDeclare(int, MinLoad);
37 CpvStaticDeclare(int, MinProc);
38 CpvStaticDeclare(int, Mindex);
39 CpvStaticDeclare(int, start);
40
41 #if ! USE_MULTICAST
42 CpvStaticDeclare(loadmsg *, msgpool);
43
44 static
45 #if CMK_C_INLINE
46 inline 
47 #endif
48 loadmsg *getPool(){
49   loadmsg *msg;
50   if (CpvAccess(msgpool)!=NULL)  {
51     msg = CpvAccess(msgpool);
52     CpvAccess(msgpool) = msg->next;
53   }
54   else {
55     msg = CmiAlloc(sizeof(loadmsg));
56     CmiSetHandler(msg, CpvAccess(CldLoadResponseHandlerIndex));
57   }
58   return msg;
59 }
60
61 static
62 #if CMK_C_INLINE
63 inline 
64 #endif
65 void putPool(loadmsg *msg)
66 {
67   msg->next = CpvAccess(msgpool);
68   CpvAccess(msgpool) = msg;
69 }
70
71 #endif
72
73 void LoadNotifyFn(int l)
74 {
75   CldProcInfo  cldData = CpvAccess(CldData);
76   cldData->sent = 0;
77 }
78
79 char *CldGetStrategy(void)
80 {
81   return "neighbor";
82 }
83
84 /* since I am idle, ask for work from neighbors */
85 static void CldBeginIdle(void *dummy)
86 {
87   CpvAccess(CldData)->lastCheck = CmiWallTimer();
88 }
89
90 static void CldEndIdle(void *dummy)
91 {
92   CpvAccess(CldData)->lastCheck = -1;
93 }
94
95 static void CldStillIdle(void *dummy, double curT)
96 {
97   int i;
98   double startT;
99   requestmsg msg;
100   int myload;
101   CldProcInfo  cldData = CpvAccess(CldData);
102
103   double now = curT;
104   double lt = cldData->lastCheck;
105   /* only ask for work every 20ms */
106   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
107   cldData->lastCheck = now;
108
109   myload = CldCountTokens();
110   if (myload > 0) return;
111
112   msg.from_pe = CmiMyPe();
113   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
114 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
115   /* fixme */
116   CmiBecomeImmediate(&msg);
117   for (i=0; i<CpvAccess(numNeighbors); i++) {
118     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
119     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
120   }
121 #else
122   msg.to_rank = -1;
123   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
124 #endif
125   cldData->sent = 1;
126
127 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
128   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
129 #endif
130 }
131
132 /* immediate message handler, work at node level */
133 /* send some work to requested proc */
134 static void CldAskLoadHandler(requestmsg *msg)
135 {
136   int receiver, rank, recvIdx, i;
137   int myload = CldCountTokens();
138   double now = CmiWallTimer();
139
140   /* only give you work if I have more than 1 */
141   if (myload>0) {
142     int sendLoad;
143     receiver = msg->from_pe;
144     rank = CmiMyRank();
145     if (msg->to_rank != -1) rank = msg->to_rank;
146 #if CMK_IMMEDIATE_MSG && IDLE_IMMEDIATE
147     /* try the lock */
148     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
149       CmiDelayImmediate();              /* postpone immediate message */
150       return;
151     }
152     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
153 #endif
154     sendLoad = myload / CpvAccess(numNeighbors) / 2;
155     if (sendLoad < 1) sendLoad = 1;
156     sendLoad = 1;
157     for (i=0; i<CpvAccess(numNeighbors); i++) 
158       if (CpvAccess(neighbors)[i].pe == receiver) break;
159     
160     if(i<CpvAccess(numNeighbors)) {CmiFree(msg); return;}   /* ? */
161     CpvAccess(neighbors)[i].load += sendLoad;
162     CldMultipleSend(receiver, sendLoad, rank, 0);
163 #if 0
164 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
165     /* this is dangerous since projections logging is not thread safe */
166     {
167     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
168     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
169     }
170 #endif
171 #endif
172   }
173   CmiFree(msg);
174 }
175
176 /* balancing by exchanging load among neighbors */
177
178 void CldSendLoad()
179 {
180 #if CMK_MULTICORE
181   /* directly send load to neighbors */
182   double myload = CldCountTokens();
183   int nNeighbors = CpvAccess(numNeighbors);
184   int i;
185   for (i=0; i<nNeighbors; i++) {
186     int neighbor_pe = CpvAccess(neighbors)[i].pe;
187     int j, found=0;
188     for (j=0; j<CpvAccessOther(numNeighbors, neighbor_pe); j++)
189       if (CpvAccessOther(neighbors, neighbor_pe)[j].pe == CmiMyPe())
190       {
191         CpvAccessOther(neighbors, neighbor_pe)[j].load = myload;
192         found = 1;
193         break;
194       }
195   }
196 #else
197 #if USE_MULTICAST
198   loadmsg msg;
199
200   msg.pe = CmiMyPe();
201   msg.load = CldCountTokens();
202   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
203   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
204   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
205 #else
206   int i;
207   int mype = CmiMyPe();
208   int myload = CldCountTokens();
209   for(i=0; i<CpvAccess(numNeighbors); i++) {
210     loadmsg *msg = getPool();
211     msg->fromindex = i;
212     msg->toindex = CpvAccess(neighbors)[i].index;
213     msg->pe = mype;
214     msg->load = myload;
215     CmiSyncSendAndFree(CpvAccess(neighbors)[i].pe, sizeof(loadmsg), msg);
216   }
217 #endif
218 #endif
219 }
220
221 int CldMinAvg()
222 {
223   int sum=0, i;
224   int myload;
225
226   int nNeighbors = CpvAccess(numNeighbors);
227   if (CpvAccess(start) == -1)
228     CpvAccess(start) = CmiMyPe() % nNeighbors;
229
230 #if 0
231     /* update load from neighbors for multicore */
232   for (i=0; i<nNeighbors; i++) {
233     CpvAccess(neighbors)[i].load = CldLoadRank(CpvAccess(neighbors)[i].pe);
234   }
235 #endif
236   CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
237   CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
238   sum = CpvAccess(neighbors)[CpvAccess(start)].load;
239   CpvAccess(Mindex) = CpvAccess(start);
240   for (i=1; i<nNeighbors; i++) {
241     CpvAccess(start) = (CpvAccess(start)+1) % nNeighbors;
242     sum += CpvAccess(neighbors)[CpvAccess(start)].load;
243     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[CpvAccess(start)].load) {
244       CpvAccess(MinLoad) = CpvAccess(neighbors)[CpvAccess(start)].load;
245       CpvAccess(MinProc) = CpvAccess(neighbors)[CpvAccess(start)].pe;
246       CpvAccess(Mindex) = CpvAccess(start);
247     }
248   }
249   CpvAccess(start) = (CpvAccess(start)+2) % nNeighbors;
250   myload = CldCountTokens();
251   sum += myload;
252   if (myload < CpvAccess(MinLoad)) {
253     CpvAccess(MinLoad) = myload;
254     CpvAccess(MinProc) = CmiMyPe();
255   }
256   i = (int)(1.0 + (((float)sum) /((float)(nNeighbors+1))));
257   return i;
258 }
259
260 void CldBalance(void *dummy, double curT)
261 {
262   int i, j, overload, numToMove=0, avgLoad;
263   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
264
265 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
266   double startT = curT;
267 #endif
268
269 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
270   avgLoad = CldMinAvg();
271 /*
272   overload = CldLoad() - avgLoad;
273   if (overload > CldCountTokens())
274     overload = CldCountTokens();
275 */
276   overload = CldCountTokens() - avgLoad;
277
278   if (overload > overload_threshold) {
279     int nNeighbors = CpvAccess(numNeighbors);
280     for (i=0; i<nNeighbors; i++)
281       if (CpvAccess(neighbors)[i].load < avgLoad) {
282         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
283         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
284           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
285         numUnderAvg++;
286       }
287     if (numUnderAvg > 0)
288       for (i=0; ((i<nNeighbors) && (overload>0)); i++) {
289           j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
290           if (CpvAccess(neighbors)[j].load < avgLoad) {
291               numToMove = (avgLoad - CpvAccess(neighbors)[j].load);
292           if (numToMove > overload)
293             numToMove = overload;
294           overload -= numToMove;
295           CpvAccess(neighbors)[j].load += numToMove;
296 #if CMK_MULTICORE
297           CldSimpleMultipleSend(CpvAccess(neighbors)[j].pe, numToMove);
298 #else
299           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
300                           numToMove, CmiMyRank(), 
301 #if CMK_SMP
302                           0
303 #else
304                           1
305 #endif
306                           );
307 #endif
308         }
309       }
310   }
311   CldSendLoad();
312 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
313   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
314 #endif
315 }
316
317 void CldBalancePeriod(void *dummy, double curT)
318 {
319     CldBalance(NULL, curT);
320     CcdCallFnAfterOnPE((CcdVoidFn)CldBalancePeriod, NULL, LBPeriod, CmiMyPe());
321 }
322
323
324 void CldLoadResponseHandler(loadmsg *msg)
325 {
326   int i;
327   
328 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
329   double startT = CmiWallTimer();
330 #endif
331 #if USE_MULTICAST
332   for(i=0; i<CpvAccess(numNeighbors); i++)
333     if (CpvAccess(neighbors)[i].pe == msg->pe) {
334       CpvAccess(neighbors)[i].load = msg->load;
335       break;
336     }
337   CmiFree(msg);
338 #else
339   int index = msg->toindex;
340   if (index == -1) {
341     for(i=0; i<CpvAccess(numNeighbors); i++)
342       if (CpvAccess(neighbors)[i].pe == msg->pe) {
343         index = i;
344         break;
345       }
346   }
347   if (index != -1) {    /* index can be -1, if neighbors table not init yet */
348     CpvAccess(neighbors)[index].load = msg->load;
349     if (CpvAccess(neighbors)[index].index == -1) CpvAccess(neighbors)[index].index = msg->fromindex;
350   }
351   putPool(msg);
352 #endif
353 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
354   traceUserBracketEvent(CpvAccess(CldData)->updateLoadEvt, startT, CmiWallTimer());
355 #endif
356 }
357
358 void CldBalanceHandler(void *msg)
359 {
360   CldRestoreHandler(msg);
361   CldPutToken(msg);
362 }
363
364 void CldHandler(void *msg)
365 {
366   CldInfoFn ifn; CldPackFn pfn;
367   int len, queueing, priobits; unsigned int *prioptr;
368   
369   CldRestoreHandler(msg);
370   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
371   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
372   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
373   /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
374 }
375
376 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
377 {
378   int len, queueing, priobits,i; unsigned int *prioptr;
379   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
380   CldPackFn pfn;
381   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
382   if (pfn) {
383     pfn(&msg);
384     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
385   }
386   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
387   CmiSetInfo(msg,infofn);
388
389   CmiSyncMulticastAndFree(grp, len, msg);
390 }
391
392 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
393 {
394   int len, queueing, priobits,i; unsigned int *prioptr;
395   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
396   CldPackFn pfn;
397   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
398   if (pfn) {
399     pfn(&msg);
400     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
401   }
402   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
403   CmiSetInfo(msg,infofn);
404   /*
405   for(i=0;i<npes;i++) {
406     CmiSyncSend(pes[i], len, msg);
407   }
408   CmiFree(msg);
409   */
410   CmiSyncListSendAndFree(npes, pes, len, msg);
411 }
412
413 void CldEnqueue(int pe, void *msg, int infofn)
414 {
415   int len, queueing, priobits, avg; unsigned int *prioptr;
416   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
417   CldPackFn pfn;
418
419   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
420     avg = CldMinAvg();
421     if (CldCountTokens() < avg)
422       pe = CmiMyPe();
423     else
424       pe = CpvAccess(MinProc);
425 #if CMK_NODE_QUEUE_AVAILABLE
426     if (CmiNodeOf(pe) == CmiMyNode()) {
427       CldNodeEnqueue(CmiMyNode(), msg, infofn);
428       return;
429     }
430 #endif
431     /* always pack the message because the message may be move away
432        to a different processor later by CldGetToken() */
433     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
434     if (pfn && CmiNumNodes()>1) {
435        pfn(&msg);
436        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
437     }
438     if (pe != CmiMyPe()) {
439       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
440       CpvAccess(CldRelocatedMessages)++;
441       CmiSetInfo(msg,infofn);
442       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
443       CmiSyncSendAndFree(pe, len, msg);
444     }
445     else {
446       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
447       CmiSetInfo(msg,infofn);
448       CldPutToken(msg);
449     }
450   } 
451   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
452     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
453     /*CmiSetInfo(msg,infofn);*/
454     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
455     /*CsdEnqueueGeneral(msg, queueing, priobits, prioptr);*/
456   }
457   else {
458     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
459     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
460       pfn(&msg);
461       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
462     }
463     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
464     CmiSetInfo(msg,infofn);
465     if (pe==CLD_BROADCAST) 
466       CmiSyncBroadcastAndFree(len, msg);
467     else if (pe==CLD_BROADCAST_ALL)
468       CmiSyncBroadcastAllAndFree(len, msg);
469     else CmiSyncSendAndFree(pe, len, msg);
470   }
471 }
472
473 void CldNodeEnqueue(int node, void *msg, int infofn)
474 {
475   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
476   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
477   CldPackFn pfn;
478   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
479     avg = CldMinAvg();
480     if (CldCountTokens() < avg)
481       pe = CmiMyPe();
482     else
483       pe = CpvAccess(MinProc);
484     node = CmiNodeOf(pe);
485     if (node != CmiMyNode()){
486         CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
487         CpvAccess(CldRelocatedMessages)++;
488         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
489         if (pfn) {
490             pfn(&msg);
491             ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
492         }
493         CmiSetInfo(msg,infofn);
494         CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
495         CmiSyncNodeSendAndFree(node, len, msg);
496     }
497     else {
498       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
499       /* CmiSetInfo(msg,infofn);
500        CldPutToken(msg); */
501       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
502     }
503   }
504   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
505     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
506 //    CmiSetInfo(msg,infofn);
507     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
508   } 
509   else {
510     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
511     if (pfn) {
512       pfn(&msg);
513       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
514     }
515     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
516     CmiSetInfo(msg,infofn);
517     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
518     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
519     else CmiSyncNodeSendAndFree(node, len, msg);
520   }
521 }
522
523 void CldReadNeighborData()
524 {
525   FILE *fp;
526   char filename[25];
527   int i, *pes;
528   
529   if (CmiNumPes() <= 1)
530     return;
531   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
532   if ((fp = fopen(filename, "r")) == 0) 
533     {
534       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
535       return;
536     }
537   fscanf(fp, "%d", &CpvAccess(numNeighbors));
538   CpvAccess(neighbors) = 
539     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
540                                      sizeof(struct CldNeighborData));
541   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
542   for (i=0; i<CpvAccess(numNeighbors); i++) {
543     fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
544     pes[i] = CpvAccess(neighbors)[i].pe;
545     CpvAccess(neighbors)[i].load = 0;
546   }
547   fclose(fp);
548   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
549 }
550
551 static void CldComputeNeighborData()
552 {
553   int i, npes;
554   int *pes;
555   LBtopoFn topofn;
556   void *topo;
557
558   topofn = LBTopoLookup(_lbtopo);
559   if (topofn == NULL) {
560     char str[1024];
561     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
562     printoutTopo();
563     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
564     CmiAbort(str);
565   }
566   topo = topofn(CmiNumPes());
567   npes = getTopoMaxNeighbors(topo);
568   pes = (int *)malloc(npes*sizeof(int));
569   getTopoNeighbors(topo, CmiMyPe(), pes, &npes);
570 #if 0
571   {
572   char buf[512], *ptr;
573   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npes);
574   ptr = buf + strlen(buf);
575   for (i=0; i<npes; i++) {
576     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
577     sprintf(ptr, " %d ", pes[i]);
578     ptr += strlen(ptr);
579   }
580   strcat(ptr, "\n");
581   CmiPrintf(buf);
582   }
583 #endif
584
585   CpvAccess(numNeighbors) = npes;
586   CpvAccess(neighbors) = 
587     (struct CldNeighborData *)calloc(npes, sizeof(struct CldNeighborData));
588   for (i=0; i<npes; i++) {
589     CpvAccess(neighbors)[i].pe = pes[i];
590     CpvAccess(neighbors)[i].load = 0;
591 #if ! USE_MULTICAST
592     CpvAccess(neighbors)[i].index = -1;
593 #endif
594   }
595   CpvAccess(neighborGroup) = CmiEstablishGroup(npes, pes);
596   free(pes);
597 }
598
599 static void topo_callback()
600 {
601   CldComputeNeighborData();
602 #if CMK_MULTICORE
603   CmiNodeBarrier();
604 #endif
605   CldBalancePeriod(NULL, CmiWallTimer());
606 }
607
608 void CldGraphModuleInit(char **argv)
609 {
610   CpvInitialize(CldProcInfo, CldData);
611   CpvInitialize(int, numNeighbors);
612   CpvInitialize(int, MinLoad);
613   CpvInitialize(int, Mindex);
614   CpvInitialize(int, MinProc);
615   CpvInitialize(int, start);
616   CpvInitialize(CmiGroup, neighborGroup);
617   CpvInitialize(CldNeighborData, neighbors);
618   CpvInitialize(int, CldBalanceHandlerIndex);
619   CpvInitialize(int, CldLoadResponseHandlerIndex);
620   CpvInitialize(int, CldAskLoadHandlerIndex);
621
622   CpvAccess(start) = -1;
623   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
624   CpvAccess(CldData)->lastCheck = -1;
625   CpvAccess(CldData)->sent = 0;
626 #if CMK_TRACE_ENABLED
627   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
628   CpvAccess(CldData)->updateLoadEvt = traceRegisterUserEvent("UpdateLoad", -1);
629   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
630   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
631 #endif
632
633   CpvAccess(MinLoad) = 0;
634   CpvAccess(Mindex) = 0;
635   CpvAccess(MinProc) = CmiMyPe();
636   CpvAccess(CldBalanceHandlerIndex) = 
637     CmiRegisterHandler(CldBalanceHandler);
638   CpvAccess(CldLoadResponseHandlerIndex) = 
639     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
640   CpvAccess(CldAskLoadHandlerIndex) = 
641     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
642
643   /* communication thread */
644   if (CmiMyRank() == CmiMyNodeSize())  return;
645
646   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
647   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
648
649   if (CmiNumPes() > 1) {
650 #if 0
651     FILE *fp;
652     char filename[20];
653   
654     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
655     if ((fp = fopen(filename, "r")) == 0)
656       {
657         if (CmiMyPe() == 0) {
658           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
659           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
660           CmiPrintf("done.\n");
661         }
662         else {
663           while (!(fp = fopen(filename, "r"))) ;
664           fclose(fp);
665         }
666       }
667     else fclose(fp);
668     CldReadNeighborData();
669 #endif
670 /* 
671     CldComputeNeighborData();
672 #if CMK_MULTICORE
673     CmiNodeBarrier();
674 #endif
675     CldBalancePeriod(NULL, CmiWallTimer());
676 */
677     CcdCallOnCondition(CcdTOPOLOGY_AVAIL, (CcdVoidFn)topo_callback, NULL);
678
679   }
680
681   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_period", &LBPeriod, "time interval to do neighbor seed lb")) {
682     CmiAssert(LBPeriod>0);
683     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor load balancing period is %d\n", LBPeriod);
684   }
685   if (CmiGetArgIntDesc(argv, "+cldb_neighbor_overload", &overload_threshold, "neighbor seed lb's overload threshold")) {
686     CmiAssert(overload_threshold>0);
687     if (CmiMyPe() == 0) CmiPrintf("Seed LB> neighbor overload threshold is %d\n", overload_threshold);
688   }
689
690 #if 1
691   _lbsteal = CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
692   if (_lbsteal) {
693   /* register idle handlers - when idle, keep asking work from neighbors */
694   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
695       (CcdVoidFn) CldBeginIdle, NULL);
696   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
697       (CcdVoidFn) CldStillIdle, NULL);
698     if (CmiMyPe() == 0) 
699       CmiPrintf("Charm++> Work stealing is enabled. \n");
700   }
701 #endif
702 }
703
704
705 void CldModuleInit(char **argv)
706 {
707   CpvInitialize(int, CldHandlerIndex);
708   CpvInitialize(int, CldRelocatedMessages);
709   CpvInitialize(int, CldLoadBalanceMessages);
710   CpvInitialize(int, CldMessageChunks);
711   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
712   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
713   CpvAccess(CldMessageChunks) = 0;
714
715   CpvInitialize(loadmsg *, msgpool);
716   CpvAccess(msgpool) = NULL;
717
718   CldModuleGeneralInit(argv);
719   CldGraphModuleInit(argv);
720
721   CpvAccess(CldLoadNotify) = 1;
722 }