updated topo object creation function to the latest version.
[charm.git] / src / conv-ldb / cldb.neighbor.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define IDLE_IMMEDIATE          1
10 #define TRACE_USEREVENTS        0
11
12 #define PERIOD 20                /* default: 30 */
13 #define MAXOVERLOAD 1
14
15 typedef struct CldProcInfo_s {
16   double lastCheck;
17   int    sent;                  /* flag to disable idle work request */
18   int    balanceEvt;            /* user event for balancing */
19   int    idleEvt;               /* user event for idle balancing */
20   int    idleprocEvt;           /* user event for processing idle req */
21 } *CldProcInfo;
22
23 extern char *_lbtopo;                   /* topology name string */
24
25 void gengraph(int, int, int, int *, int *);
26
27 CpvStaticDeclare(CldProcInfo, CldData);
28 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
29 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
30 CpvStaticDeclare(int, MinLoad);
31 CpvStaticDeclare(int, MinProc);
32 CpvStaticDeclare(int, Mindex);
33
34 void LoadNotifyFn(int l)
35 {
36   CldProcInfo  cldData = CpvAccess(CldData);
37   cldData->sent = 0;
38 }
39
40 char *CldGetStrategy(void)
41 {
42   return "neighbor";
43 }
44
45 /* since I am idle, ask for work from neighbors */
46 static void CldBeginIdle(void *dummy)
47 {
48   CpvAccess(CldData)->lastCheck = CmiWallTimer();
49 }
50
51 static void CldEndIdle(void *dummy)
52 {
53   CpvAccess(CldData)->lastCheck = -1;
54 }
55
56 static void CldStillIdle(void *dummy)
57 {
58   int i;
59   double startT;
60   requestmsg msg;
61   int myload;
62   CldProcInfo  cldData = CpvAccess(CldData);
63
64   double now = CmiWallTimer();
65   double lt = cldData->lastCheck;
66   /* only ask for work every 20ms */
67   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
68   cldData->lastCheck = now;
69
70   myload = CldLoad();
71   CmiAssert(myload == 0);
72   if (myload > 0) return;
73
74   msg.from_pe = CmiMyPe();
75   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
76 #if ! IDLE_IMMEDIATE
77   msg.to_rank = -1;
78   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
79 #else
80   /* fixme */
81   CmiBecomeImmediate(&msg);
82   for (i=0; i<CpvAccess(numNeighbors); i++) {
83     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
84     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
85   }
86 #endif
87   cldData->sent = 1;
88
89 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
90   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
91 #endif
92 }
93
94 /* immediate message handler, work at node level */
95 /* send some work to requested proc */
96 static void CldAskLoadHandler(requestmsg *msg)
97 {
98   int receiver, rank, recvIdx, i;
99   int myload = CldLoad();
100   double now = CmiWallTimer();
101
102   /* only give you work if I have more than 1 */
103   if (myload>0) {
104     int sendLoad;
105     receiver = msg->from_pe;
106     rank = CmiMyRank();
107     if (msg->to_rank != -1) rank = msg->to_rank;
108 #if IDLE_IMMEDIATE
109     /* try the lock */
110     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
111       CmiDelayImmediate();              /* postpone immediate message */
112       return;
113     }
114     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
115 #endif
116     sendLoad = myload / CpvAccess(numNeighbors) / 2;
117     if (sendLoad < 1) sendLoad = 1;
118     sendLoad = 1;
119     for (i=0; i<CpvAccess(numNeighbors); i++) 
120       if (CpvAccess(neighbors)[i].pe == receiver) break;
121     CmiAssert(i<CpvAccess(numNeighbors));
122     CpvAccess(neighbors)[i].load += sendLoad;
123     CldMultipleSend(receiver, sendLoad, rank, 0);
124 #if 0
125 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
126     /* this is dangerous since projections logging is not thread safe */
127     {
128     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
129     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
130     }
131 #endif
132 #endif
133   }
134   CmiFree(msg);
135 }
136
137 /* balancing by exchanging load among neighbors */
138
139 void CldSendLoad()
140 {
141   loadmsg msg;
142
143   msg.pe = CmiMyPe();
144   msg.load = CldLoad();
145   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
146   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
147   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
148 }
149
150 int CldMinAvg()
151 {
152   int sum=0, i;
153   static int start=-1;
154
155   if (start == -1)
156     start = CmiMyPe() % CpvAccess(numNeighbors);
157   CpvAccess(MinProc) = CpvAccess(neighbors)[start].pe;
158   CpvAccess(MinLoad) = CpvAccess(neighbors)[start].load;
159   sum = CpvAccess(neighbors)[start].load;
160   CpvAccess(Mindex) = start;
161   for (i=1; i<CpvAccess(numNeighbors); i++) {
162     start = (start+1) % CpvAccess(numNeighbors);
163     sum += CpvAccess(neighbors)[start].load;
164     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[start].load) {
165       CpvAccess(MinLoad) = CpvAccess(neighbors)[start].load;
166       CpvAccess(MinProc) = CpvAccess(neighbors)[start].pe;
167       CpvAccess(Mindex) = start;
168     }
169   }
170   start = (start+2) % CpvAccess(numNeighbors);
171   sum += CldLoad();
172   if (CldLoad() < CpvAccess(MinLoad)) {
173     CpvAccess(MinLoad) = CldLoad();
174     CpvAccess(MinProc) = CmiMyPe();
175   }
176   i = (int)(1.0 + (((float)sum) /((float)(CpvAccess(numNeighbors)+1))));
177   return i;
178 }
179
180 void CldBalance()
181 {
182   int i, j, overload, numToMove=0, avgLoad;
183   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
184
185 #ifndef CMK_OPTIMIZE
186   double startT = CmiWallTimer();
187 #endif
188
189 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
190   avgLoad = CldMinAvg();
191   overload = CldLoad() - avgLoad;
192   if (overload > CldCountTokens())
193     overload = CldCountTokens();
194
195   if (overload > MAXOVERLOAD) {
196     for (i=0; i<CpvAccess(numNeighbors); i++)
197       if (CpvAccess(neighbors)[i].load < avgLoad) {
198         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
199         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
200           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
201         numUnderAvg++;
202       }
203     if (numUnderAvg > 0)
204       for (i=0; ((i<CpvAccess(numNeighbors)) && (overload>0)); i++) {
205         j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
206         if (CpvAccess(neighbors)[j].load < avgLoad) {
207           numToMove = avgLoad - CpvAccess(neighbors)[j].load;
208           if (numToMove > overload)
209             numToMove = overload;
210           overload -= numToMove;
211           CpvAccess(neighbors)[j].load += numToMove;
212           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
213                           numToMove, CmiMyRank(), 
214 #if CMK_SMP
215                           0
216 #else
217                           1
218 #endif
219                           );
220         }
221       }
222   }
223   CldSendLoad();
224 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
225   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
226 #endif
227   CcdCallFnAfterOnPE((CcdVoidFn)CldBalance, NULL, PERIOD, CmiMyPe());
228
229 }
230
231 void CldLoadResponseHandler(loadmsg *msg)
232 {
233   int i;
234
235   for(i=0; i<CpvAccess(numNeighbors); i++)
236     if (CpvAccess(neighbors)[i].pe == msg->pe) {
237       CpvAccess(neighbors)[i].load = msg->load;
238       break;
239     }
240   CmiFree(msg);
241 }
242
243 void CldBalanceHandler(void *msg)
244 {
245   CldRestoreHandler(msg);
246   CldPutToken(msg);
247 }
248
249 void CldHandler(void *msg)
250 {
251   CldInfoFn ifn; CldPackFn pfn;
252   int len, queueing, priobits; unsigned int *prioptr;
253   
254   CldRestoreHandler(msg);
255   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
256   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
257   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
258 }
259
260 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
261 {
262   int len, queueing, priobits,i; unsigned int *prioptr;
263   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
264   CldPackFn pfn;
265   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
266   if (pfn) {
267     pfn(&msg);
268     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
269   }
270   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
271   CmiSetInfo(msg,infofn);
272   /*
273   for(i=0;i<npes;i++) {
274     CmiSyncSend(pes[i], len, msg);
275   }
276   CmiFree(msg);
277   */
278   CmiSyncListSendAndFree(npes, pes, len, msg);
279 }
280
281 void CldEnqueue(int pe, void *msg, int infofn)
282 {
283   int len, queueing, priobits, avg; unsigned int *prioptr;
284   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
285   CldPackFn pfn;
286
287   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
288     avg = CldMinAvg();
289     if (CldLoad() < avg)
290       pe = CmiMyPe();
291     else
292       pe = CpvAccess(MinProc);
293     /* always pack the message because the message may be move away
294        to a different processor later by CldGetToken() */
295     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
296     if (pfn) {
297        pfn(&msg);
298        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
299     }
300     if (pe != CmiMyPe()) {
301       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
302       CpvAccess(CldRelocatedMessages)++;
303       CmiSetInfo(msg,infofn);
304       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
305       CmiSyncSendAndFree(pe, len, msg);
306     }
307     else {
308       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
309       CmiSetInfo(msg,infofn);
310       CldPutToken(msg);
311     }
312   } 
313   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
314     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
315     CmiSetInfo(msg,infofn);
316     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
317   }
318   else {
319     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
320     if (pfn) {
321       pfn(&msg);
322       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
323     }
324     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
325     CmiSetInfo(msg,infofn);
326     if (pe==CLD_BROADCAST) 
327       CmiSyncBroadcastAndFree(len, msg);
328     else if (pe==CLD_BROADCAST_ALL)
329       CmiSyncBroadcastAllAndFree(len, msg);
330     else CmiSyncSendAndFree(pe, len, msg);
331   }
332 }
333
334 void CldNodeEnqueue(int node, void *msg, int infofn)
335 {
336   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
337   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
338   CldPackFn pfn;
339   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
340     avg = CldMinAvg();
341     if (CldLoad() < avg)
342       pe = CmiMyPe();
343     else
344       pe = CpvAccess(MinProc);
345     node = CmiNodeOf(pe);
346     if (node != CmiMyNode()){
347       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
348       CpvAccess(CldRelocatedMessages)++;
349       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
350       if (pfn) {
351         pfn(&msg);
352         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
353       }
354       CmiSetInfo(msg,infofn);
355       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
356       CmiSyncNodeSendAndFree(node, len, msg);
357     }
358     else {
359       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
360       CmiSetInfo(msg,infofn);
361       CldPutToken(msg);
362     }
363   }
364   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
365     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
366     CmiSetInfo(msg,infofn);
367     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
368   } 
369   else {
370     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
371     if (pfn) {
372       pfn(&msg);
373       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
374     }
375     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
376     CmiSetInfo(msg,infofn);
377     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
378     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
379     else CmiSyncNodeSendAndFree(node, len, msg);
380   }
381 }
382
383 void CldReadNeighborData()
384 {
385   FILE *fp;
386   char filename[25];
387   int i, *pes;
388   
389   if (CmiNumPes() <= 1)
390     return;
391   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
392   if ((fp = fopen(filename, "r")) == 0) 
393     {
394       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
395       return;
396     }
397   fscanf(fp, "%d", &CpvAccess(numNeighbors));
398   CpvAccess(neighbors) = 
399     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
400                                      sizeof(struct CldNeighborData));
401   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
402   for (i=0; i<CpvAccess(numNeighbors); i++) {
403     fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
404     pes[i] = CpvAccess(neighbors)[i].pe;
405     CpvAccess(neighbors)[i].load = 0;
406   }
407   fclose(fp);
408   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
409 }
410
411 static void CldComputeNeighborData()
412 {
413   int i, npe;
414   int *pes;
415   LBtopoFn topofn;
416   void *topo;
417
418   topofn = LBTopoLookup(_lbtopo);
419   if (topofn == NULL) {
420     char str[1024];
421     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
422     printoutTopo();
423     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
424     CmiAbort(str);
425   }
426   topo = topofn(CmiNumPes());
427   npe = getTopoMaxNeighbors(topo);
428   pes = (int *)malloc(npe*sizeof(int));
429   getTopoNeighbors(topo, CmiMyPe(), pes, &npe);
430 #if 0
431   {
432   char buf[512], *ptr;
433   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npe);
434   ptr = buf + strlen(buf);
435   for (i=0; i<npe; i++) {
436     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
437     sprintf(ptr, " %d ", pes[i]);
438     ptr += strlen(ptr);
439   }
440   strcat(ptr, "\n");
441   CmiPrintf(buf);
442   }
443 #endif
444
445   CpvAccess(numNeighbors) = npe;
446   CpvAccess(neighbors) = 
447     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
448                                      sizeof(struct CldNeighborData));
449   for (i=0; i<CpvAccess(numNeighbors); i++) {
450     CpvAccess(neighbors)[i].pe = pes[i];
451     CpvAccess(neighbors)[i].load = 0;
452   }
453   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
454   free(pes);
455 }
456
457 void CldGraphModuleInit(char **argv)
458 {
459   CpvInitialize(CldProcInfo, CldData);
460   CpvInitialize(int, numNeighbors);
461   CpvInitialize(int, MinLoad);
462   CpvInitialize(int, Mindex);
463   CpvInitialize(int, MinProc);
464   CpvInitialize(CmiGroup, neighborGroup);
465   CpvInitialize(CldNeighborData, neighbors);
466   CpvInitialize(int, CldBalanceHandlerIndex);
467   CpvInitialize(int, CldLoadResponseHandlerIndex);
468   CpvInitialize(int, CldAskLoadHandlerIndex);
469
470   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
471   CpvAccess(CldData)->lastCheck = -1;
472   CpvAccess(CldData)->sent = 0;
473 #ifndef CMK_OPTIMIZE
474   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
475   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
476   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
477 #endif
478
479   CpvAccess(MinLoad) = 0;
480   CpvAccess(Mindex) = 0;
481   CpvAccess(MinProc) = CmiMyPe();
482   CpvAccess(CldBalanceHandlerIndex) = 
483     CmiRegisterHandler(CldBalanceHandler);
484   CpvAccess(CldLoadResponseHandlerIndex) = 
485     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
486   CpvAccess(CldAskLoadHandlerIndex) = 
487     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
488
489   /* communication thread */
490   if (CmiMyRank() == CmiMyNodeSize())  return;
491
492   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
493   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
494
495   if (CmiNumPes() > 1) {
496 #if 0
497     FILE *fp;
498     char filename[20];
499   
500     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
501     if ((fp = fopen(filename, "r")) == 0)
502       {
503         if (CmiMyPe() == 0) {
504           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
505           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
506           CmiPrintf("done.\n");
507         }
508         else {
509           while (!(fp = fopen(filename, "r"))) ;
510           fclose(fp);
511         }
512       }
513     else fclose(fp);
514     CldReadNeighborData();
515 #endif
516     CldComputeNeighborData();
517     CldBalance();
518   }
519
520 #if 1
521   /* register idle handlers - when idle, keep asking work from neighbors */
522   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
523       (CcdVoidFn) CldStillIdle, NULL);
524   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
525       (CcdVoidFn) CldStillIdle, NULL);
526 #endif
527 }
528
529 void CldModuleInit(char **argv)
530 {
531   CpvInitialize(int, CldHandlerIndex);
532   CpvInitialize(int, CldRelocatedMessages);
533   CpvInitialize(int, CldLoadBalanceMessages);
534   CpvInitialize(int, CldMessageChunks);
535   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
536   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
537   CpvAccess(CldMessageChunks) = 0;
538
539   CpvAccess(CldLoadNotify) = 1;
540
541   CldModuleGeneralInit(argv);
542   CldGraphModuleInit(argv);
543 }