added a command line option +workstealing to enable work stealing. By default it...
[charm.git] / src / conv-ldb / cldb.neighbor.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.neighbor.h"
5 #include "queueing.h"
6 #include "cldb.h"
7 #include "topology.h"
8
9 #define IDLE_IMMEDIATE          1
10 #define TRACE_USEREVENTS        0
11
12 #define PERIOD 20                /* default: 30 */
13 #define MAXOVERLOAD 1
14
15 typedef struct CldProcInfo_s {
16   double lastCheck;
17   int    sent;                  /* flag to disable idle work request */
18   int    balanceEvt;            /* user event for balancing */
19   int    idleEvt;               /* user event for idle balancing */
20   int    idleprocEvt;           /* user event for processing idle req */
21 } *CldProcInfo;
22
23 extern char *_lbtopo;                   /* topology name string */
24 int _lbsteal = 0;                       /* work stealing flag */
25
26 void gengraph(int, int, int, int *, int *);
27
28 CpvStaticDeclare(CldProcInfo, CldData);
29 CpvStaticDeclare(int, CldLoadResponseHandlerIndex);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, MinLoad);
32 CpvStaticDeclare(int, MinProc);
33 CpvStaticDeclare(int, Mindex);
34
35 void LoadNotifyFn(int l)
36 {
37   CldProcInfo  cldData = CpvAccess(CldData);
38   cldData->sent = 0;
39 }
40
41 char *CldGetStrategy(void)
42 {
43   return "neighbor";
44 }
45
46 /* since I am idle, ask for work from neighbors */
47 static void CldBeginIdle(void *dummy)
48 {
49   CpvAccess(CldData)->lastCheck = CmiWallTimer();
50 }
51
52 static void CldEndIdle(void *dummy)
53 {
54   CpvAccess(CldData)->lastCheck = -1;
55 }
56
57 static void CldStillIdle(void *dummy)
58 {
59   int i;
60   double startT;
61   requestmsg msg;
62   int myload;
63   CldProcInfo  cldData = CpvAccess(CldData);
64
65   double now = CmiWallTimer();
66   double lt = cldData->lastCheck;
67   /* only ask for work every 20ms */
68   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
69   cldData->lastCheck = now;
70
71   myload = CldLoad();
72   CmiAssert(myload == 0);
73   if (myload > 0) return;
74
75   msg.from_pe = CmiMyPe();
76   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
77 #if ! IDLE_IMMEDIATE
78   msg.to_rank = -1;
79   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(requestmsg), &msg);
80 #else
81   /* fixme */
82   CmiBecomeImmediate(&msg);
83   for (i=0; i<CpvAccess(numNeighbors); i++) {
84     msg.to_rank = CmiRankOf(CpvAccess(neighbors)[i].pe);
85     CmiSyncNodeSend(CmiNodeOf(CpvAccess(neighbors)[i].pe),sizeof(requestmsg),(char *)&msg);
86   }
87 #endif
88   cldData->sent = 1;
89
90 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
91   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
92 #endif
93 }
94
95 /* immediate message handler, work at node level */
96 /* send some work to requested proc */
97 static void CldAskLoadHandler(requestmsg *msg)
98 {
99   int receiver, rank, recvIdx, i;
100   int myload = CldLoad();
101   double now = CmiWallTimer();
102
103   /* only give you work if I have more than 1 */
104   if (myload>0) {
105     int sendLoad;
106     receiver = msg->from_pe;
107     rank = CmiMyRank();
108     if (msg->to_rank != -1) rank = msg->to_rank;
109 #if IDLE_IMMEDIATE
110     /* try the lock */
111     if (CmiTryLock(CpvAccessOther(cldLock, rank))) {
112       CmiDelayImmediate();              /* postpone immediate message */
113       return;
114     }
115     CmiUnlock(CpvAccessOther(cldLock, rank));  /* release lock, grab later */
116 #endif
117     sendLoad = myload / CpvAccess(numNeighbors) / 2;
118     if (sendLoad < 1) sendLoad = 1;
119     sendLoad = 1;
120     for (i=0; i<CpvAccess(numNeighbors); i++) 
121       if (CpvAccess(neighbors)[i].pe == receiver) break;
122     CmiAssert(i<CpvAccess(numNeighbors));
123     CpvAccess(neighbors)[i].load += sendLoad;
124     CldMultipleSend(receiver, sendLoad, rank, 0);
125 #if 0
126 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
127     /* this is dangerous since projections logging is not thread safe */
128     {
129     CldProcInfo  cldData = CpvAccessOther(CldData, rank);
130     traceUserBracketEvent(cldData->idleprocEvt, now, CmiWallTimer());
131     }
132 #endif
133 #endif
134   }
135   CmiFree(msg);
136 }
137
138 /* balancing by exchanging load among neighbors */
139
140 void CldSendLoad()
141 {
142   loadmsg msg;
143
144   msg.pe = CmiMyPe();
145   msg.load = CldLoad();
146   CmiSetHandler(&msg, CpvAccess(CldLoadResponseHandlerIndex));
147   CmiSyncMulticast(CpvAccess(neighborGroup), sizeof(loadmsg), &msg);
148   CpvAccess(CldLoadBalanceMessages) += CpvAccess(numNeighbors);
149 }
150
151 int CldMinAvg()
152 {
153   int sum=0, i;
154   static int start=-1;
155
156   if (start == -1)
157     start = CmiMyPe() % CpvAccess(numNeighbors);
158   CpvAccess(MinProc) = CpvAccess(neighbors)[start].pe;
159   CpvAccess(MinLoad) = CpvAccess(neighbors)[start].load;
160   sum = CpvAccess(neighbors)[start].load;
161   CpvAccess(Mindex) = start;
162   for (i=1; i<CpvAccess(numNeighbors); i++) {
163     start = (start+1) % CpvAccess(numNeighbors);
164     sum += CpvAccess(neighbors)[start].load;
165     if (CpvAccess(MinLoad) > CpvAccess(neighbors)[start].load) {
166       CpvAccess(MinLoad) = CpvAccess(neighbors)[start].load;
167       CpvAccess(MinProc) = CpvAccess(neighbors)[start].pe;
168       CpvAccess(Mindex) = start;
169     }
170   }
171   start = (start+2) % CpvAccess(numNeighbors);
172   sum += CldLoad();
173   if (CldLoad() < CpvAccess(MinLoad)) {
174     CpvAccess(MinLoad) = CldLoad();
175     CpvAccess(MinProc) = CmiMyPe();
176   }
177   i = (int)(1.0 + (((float)sum) /((float)(CpvAccess(numNeighbors)+1))));
178   return i;
179 }
180
181 void CldBalance()
182 {
183   int i, j, overload, numToMove=0, avgLoad;
184   int totalUnderAvg=0, numUnderAvg=0, maxUnderAvg=0;
185
186 #ifndef CMK_OPTIMIZE
187   double startT = CmiWallTimer();
188 #endif
189
190 /*CmiPrintf("[%d] CldBalance %f\n", CmiMyPe(), startT);*/
191   avgLoad = CldMinAvg();
192   overload = CldLoad() - avgLoad;
193   if (overload > CldCountTokens())
194     overload = CldCountTokens();
195
196   if (overload > MAXOVERLOAD) {
197     for (i=0; i<CpvAccess(numNeighbors); i++)
198       if (CpvAccess(neighbors)[i].load < avgLoad) {
199         totalUnderAvg += avgLoad-CpvAccess(neighbors)[i].load;
200         if (avgLoad - CpvAccess(neighbors)[i].load > maxUnderAvg)
201           maxUnderAvg = avgLoad - CpvAccess(neighbors)[i].load;
202         numUnderAvg++;
203       }
204     if (numUnderAvg > 0)
205       for (i=0; ((i<CpvAccess(numNeighbors)) && (overload>0)); i++) {
206         j = (i+CpvAccess(Mindex))%CpvAccess(numNeighbors);
207         if (CpvAccess(neighbors)[j].load < avgLoad) {
208           numToMove = avgLoad - CpvAccess(neighbors)[j].load;
209           if (numToMove > overload)
210             numToMove = overload;
211           overload -= numToMove;
212           CpvAccess(neighbors)[j].load += numToMove;
213           CldMultipleSend(CpvAccess(neighbors)[j].pe, 
214                           numToMove, CmiMyRank(), 
215 #if CMK_SMP
216                           0
217 #else
218                           1
219 #endif
220                           );
221         }
222       }
223   }
224   CldSendLoad();
225 #if !defined(CMK_OPTIMIZE) && TRACE_USEREVENTS
226   traceUserBracketEvent(CpvAccess(CldData)->balanceEvt, startT, CmiWallTimer());
227 #endif
228   CcdCallFnAfterOnPE((CcdVoidFn)CldBalance, NULL, PERIOD, CmiMyPe());
229
230 }
231
232 void CldLoadResponseHandler(loadmsg *msg)
233 {
234   int i;
235
236   for(i=0; i<CpvAccess(numNeighbors); i++)
237     if (CpvAccess(neighbors)[i].pe == msg->pe) {
238       CpvAccess(neighbors)[i].load = msg->load;
239       break;
240     }
241   CmiFree(msg);
242 }
243
244 void CldBalanceHandler(void *msg)
245 {
246   CldRestoreHandler(msg);
247   CldPutToken(msg);
248 }
249
250 void CldHandler(void *msg)
251 {
252   CldInfoFn ifn; CldPackFn pfn;
253   int len, queueing, priobits; unsigned int *prioptr;
254   
255   CldRestoreHandler(msg);
256   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
257   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
258   CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
259 }
260
261 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
262 {
263   int len, queueing, priobits,i; unsigned int *prioptr;
264   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
265   CldPackFn pfn;
266   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
267   if (pfn) {
268     pfn(&msg);
269     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
270   }
271   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
272   CmiSetInfo(msg,infofn);
273   /*
274   for(i=0;i<npes;i++) {
275     CmiSyncSend(pes[i], len, msg);
276   }
277   CmiFree(msg);
278   */
279   CmiSyncListSendAndFree(npes, pes, len, msg);
280 }
281
282 void CldEnqueue(int pe, void *msg, int infofn)
283 {
284   int len, queueing, priobits, avg; unsigned int *prioptr;
285   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
286   CldPackFn pfn;
287
288   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
289     avg = CldMinAvg();
290     if (CldLoad() < avg)
291       pe = CmiMyPe();
292     else
293       pe = CpvAccess(MinProc);
294     /* always pack the message because the message may be move away
295        to a different processor later by CldGetToken() */
296     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
297     if (pfn) {
298        pfn(&msg);
299        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
300     }
301     if (pe != CmiMyPe()) {
302       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
303       CpvAccess(CldRelocatedMessages)++;
304       CmiSetInfo(msg,infofn);
305       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
306       CmiSyncSendAndFree(pe, len, msg);
307     }
308     else {
309       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
310       CmiSetInfo(msg,infofn);
311       CldPutToken(msg);
312     }
313   } 
314   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
315     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
316     CmiSetInfo(msg,infofn);
317     CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
318   }
319   else {
320     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
321     if (pfn) {
322       pfn(&msg);
323       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
324     }
325     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
326     CmiSetInfo(msg,infofn);
327     if (pe==CLD_BROADCAST) 
328       CmiSyncBroadcastAndFree(len, msg);
329     else if (pe==CLD_BROADCAST_ALL)
330       CmiSyncBroadcastAllAndFree(len, msg);
331     else CmiSyncSendAndFree(pe, len, msg);
332   }
333 }
334
335 void CldNodeEnqueue(int node, void *msg, int infofn)
336 {
337   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
338   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
339   CldPackFn pfn;
340   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
341     avg = CldMinAvg();
342     if (CldLoad() < avg)
343       pe = CmiMyPe();
344     else
345       pe = CpvAccess(MinProc);
346     node = CmiNodeOf(pe);
347     if (node != CmiMyNode()){
348       CpvAccess(neighbors)[CpvAccess(Mindex)].load++;
349       CpvAccess(CldRelocatedMessages)++;
350       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
351       if (pfn) {
352         pfn(&msg);
353         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
354       }
355       CmiSetInfo(msg,infofn);
356       CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
357       CmiSyncNodeSendAndFree(node, len, msg);
358     }
359     else {
360       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
361       CmiSetInfo(msg,infofn);
362       CldPutToken(msg);
363     }
364   }
365   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
366     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
367     CmiSetInfo(msg,infofn);
368     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
369   } 
370   else {
371     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
372     if (pfn) {
373       pfn(&msg);
374       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
375     }
376     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
377     CmiSetInfo(msg,infofn);
378     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
379     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
380     else CmiSyncNodeSendAndFree(node, len, msg);
381   }
382 }
383
384 void CldReadNeighborData()
385 {
386   FILE *fp;
387   char filename[25];
388   int i, *pes;
389   
390   if (CmiNumPes() <= 1)
391     return;
392   sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
393   if ((fp = fopen(filename, "r")) == 0) 
394     {
395       CmiError("Error opening graph init file on PE: %d\n", CmiMyPe());
396       return;
397     }
398   fscanf(fp, "%d", &CpvAccess(numNeighbors));
399   CpvAccess(neighbors) = 
400     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
401                                      sizeof(struct CldNeighborData));
402   pes = (int *)calloc(CpvAccess(numNeighbors), sizeof(int));
403   for (i=0; i<CpvAccess(numNeighbors); i++) {
404     fscanf(fp, "%d", &(CpvAccess(neighbors)[i].pe));
405     pes[i] = CpvAccess(neighbors)[i].pe;
406     CpvAccess(neighbors)[i].load = 0;
407   }
408   fclose(fp);
409   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
410 }
411
412 static void CldComputeNeighborData()
413 {
414   int i, npe;
415   int *pes;
416   LBtopoFn topofn;
417   void *topo;
418
419   topofn = LBTopoLookup(_lbtopo);
420   if (topofn == NULL) {
421     char str[1024];
422     CmiPrintf("SeedLB> Fatal error: Unknown topology: %s. Choose from:\n", _lbtopo);
423     printoutTopo();
424     sprintf(str, "SeedLB> Fatal error: Unknown topology: %s", _lbtopo);
425     CmiAbort(str);
426   }
427   topo = topofn(CmiNumPes());
428   npe = getTopoMaxNeighbors(topo);
429   pes = (int *)malloc(npe*sizeof(int));
430   getTopoNeighbors(topo, CmiMyPe(), pes, &npe);
431 #if 0
432   {
433   char buf[512], *ptr;
434   sprintf(buf, "Neighors for PE %d (%d): ", CmiMyPe(), npe);
435   ptr = buf + strlen(buf);
436   for (i=0; i<npe; i++) {
437     CmiAssert(pes[i] < CmiNumPes() && pes[i] != CmiMyPe());
438     sprintf(ptr, " %d ", pes[i]);
439     ptr += strlen(ptr);
440   }
441   strcat(ptr, "\n");
442   CmiPrintf(buf);
443   }
444 #endif
445
446   CpvAccess(numNeighbors) = npe;
447   CpvAccess(neighbors) = 
448     (struct CldNeighborData *)calloc(CpvAccess(numNeighbors), 
449                                      sizeof(struct CldNeighborData));
450   for (i=0; i<CpvAccess(numNeighbors); i++) {
451     CpvAccess(neighbors)[i].pe = pes[i];
452     CpvAccess(neighbors)[i].load = 0;
453   }
454   CpvAccess(neighborGroup) = CmiEstablishGroup(CpvAccess(numNeighbors), pes);
455   free(pes);
456 }
457
458 void CldGraphModuleInit(char **argv)
459 {
460   CpvInitialize(CldProcInfo, CldData);
461   CpvInitialize(int, numNeighbors);
462   CpvInitialize(int, MinLoad);
463   CpvInitialize(int, Mindex);
464   CpvInitialize(int, MinProc);
465   CpvInitialize(CmiGroup, neighborGroup);
466   CpvInitialize(CldNeighborData, neighbors);
467   CpvInitialize(int, CldBalanceHandlerIndex);
468   CpvInitialize(int, CldLoadResponseHandlerIndex);
469   CpvInitialize(int, CldAskLoadHandlerIndex);
470
471   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
472   CpvAccess(CldData)->lastCheck = -1;
473   CpvAccess(CldData)->sent = 0;
474 #ifndef CMK_OPTIMIZE
475   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
476   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
477   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
478 #endif
479
480   CpvAccess(MinLoad) = 0;
481   CpvAccess(Mindex) = 0;
482   CpvAccess(MinProc) = CmiMyPe();
483   CpvAccess(CldBalanceHandlerIndex) = 
484     CmiRegisterHandler(CldBalanceHandler);
485   CpvAccess(CldLoadResponseHandlerIndex) = 
486     CmiRegisterHandler((CmiHandler)CldLoadResponseHandler);
487   CpvAccess(CldAskLoadHandlerIndex) = 
488     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
489
490   /* communication thread */
491   if (CmiMyRank() == CmiMyNodeSize())  return;
492
493   CmiGetArgStringDesc(argv, "+LBTopo", &_lbtopo, "define load balancing topology");
494   if (CmiMyPe() == 0) CmiPrintf("Seed LB> Topology %s\n", _lbtopo);
495
496   if (CmiNumPes() > 1) {
497 #if 0
498     FILE *fp;
499     char filename[20];
500   
501     sprintf(filename, "graph%d/graph%d", CmiNumPes(), CmiMyPe());
502     if ((fp = fopen(filename, "r")) == 0)
503       {
504         if (CmiMyPe() == 0) {
505           CmiPrintf("No proper graph%d directory exists in current directory.\n Generating...  ", CmiNumPes());
506           gengraph(CmiNumPes(), (int)(sqrt(CmiNumPes())+0.5), 234);
507           CmiPrintf("done.\n");
508         }
509         else {
510           while (!(fp = fopen(filename, "r"))) ;
511           fclose(fp);
512         }
513       }
514     else fclose(fp);
515     CldReadNeighborData();
516 #endif
517     CldComputeNeighborData();
518     CldBalance();
519   }
520
521 #if 1
522   CmiGetArgStringDesc(argv, "+workstealing", &_lbsteal, "Enable work stealing at idle time");
523   if (_lbsteal) {
524   /* register idle handlers - when idle, keep asking work from neighbors */
525   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
526       (CcdVoidFn) CldStillIdle, NULL);
527   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
528       (CcdVoidFn) CldStillIdle, NULL);
529   }
530 #endif
531 }
532
533 void CldModuleInit(char **argv)
534 {
535   CpvInitialize(int, CldHandlerIndex);
536   CpvInitialize(int, CldRelocatedMessages);
537   CpvInitialize(int, CldLoadBalanceMessages);
538   CpvInitialize(int, CldMessageChunks);
539   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
540   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
541   CpvAccess(CldMessageChunks) = 0;
542
543   CpvAccess(CldLoadNotify) = 1;
544
545   CldModuleGeneralInit(argv);
546   CldGraphModuleInit(argv);
547 }