Remove archaic CVS keyword header comment blocks
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define TRACE_USEREVENTS        1
9 #define LOADTHRESH              3
10
11 typedef struct CldProcInfo_s {
12   int    askEvt;                /* user event for askLoad */
13   int    askNoEvt;              /* user event for askNoLoad */
14   int    idleEvt;               /* user event for idle balancing */
15 } *CldProcInfo;
16
17 static int WS_Threshold = -1;
18 static int _steal_prio = 0;
19 static int _stealonly1 = 0;
20 static int _steal_immediate = 0;
21 static int workstealingproactive = 0;
22
23 CpvStaticDeclare(CldProcInfo, CldData);
24 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
25 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
26 CpvStaticDeclare(int, isStealing);
27
28
29 char *CldGetStrategy(void)
30 {
31   return "work stealing";
32 }
33
34
35 static void StealLoad()
36 {
37   int i;
38   double now;
39   requestmsg msg;
40   int  victim;
41   int mype;
42   int numpes;
43
44   if (CpvAccess(isStealing)) return;    /* already stealing, return */
45
46   CpvAccess(isStealing) = 1;
47
48 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
49   now = CmiWallTimer();
50 #endif
51
52   mype = CmiMyPe();
53   msg.from_pe = mype;
54   numpes = CmiNumPes();
55   do{
56       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
57   }while(victim == mype);
58
59   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
60 #if CMK_IMMEDIATE_MSG
61   /* fixme */
62   if (_steal_immediate) CmiBecomeImmediate(&msg);
63 #endif
64   /* msg.to_rank = CmiRankOf(victim); */
65   msg.to_pe = victim;
66   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
67   
68 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
69   traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
70 #endif
71 }
72
73 void LoadNotifyFn(int l)
74 {
75     if(CldCountTokens() <= WS_Threshold)
76         StealLoad();
77 }
78
79 /* since I am idle, ask for work from neighbors */
80 static void CldBeginIdle(void *dummy)
81 {
82     //if (CldCountTokens() == 0) StealLoad();
83     CmiAssert(CldCountTokens()==0);
84     StealLoad();
85 }
86
87 /* immediate message handler, work at node level */
88 /* send some work to requested proc */
89 static void CldAskLoadHandler(requestmsg *msg)
90 {
91   int receiver, rank, recvIdx, i;
92   int myload, sendLoad;
93   double now;
94   /* int myload = CldLoad(); */
95
96 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
97   now = CmiWallTimer();
98 #endif
99
100   /* rank = msg->to_rank; */
101   CmiAssert(msg->to_pe!=-1);
102   rank = CmiRankOf(msg->to_pe);
103   CmiAssert(rank!=-1);
104   myload = CldCountTokensRank(rank);
105
106   receiver = msg->from_pe;
107   /* only give you work if I have more than 1 */
108   if (myload>LOADTHRESH) {
109       if(_stealonly1) sendLoad = 1;
110       else sendLoad = myload/2; 
111       if(sendLoad > 0) {
112 #if ! CMK_USE_IBVERBS
113         if (_steal_prio)
114           CldMultipleSendPrio(receiver, sendLoad, rank, 0);
115         else
116           CldMultipleSend(receiver, sendLoad, rank, 0);
117 #else
118           CldSimpleMultipleSend(receiver, sendLoad, rank);
119 #endif
120       }
121       CmiFree(msg);
122   }else
123   {     /* send ack indicating there is no task */
124       int pe = msg->to_pe;
125       msg->to_pe = msg->from_pe;
126       msg->from_pe = pe;
127       /*msg->to_rank = CmiMyRank(); */
128
129       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
130       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
131   }
132
133 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
134   traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
135 #endif
136 }
137
138 void  CldAckNoTaskHandler(requestmsg *msg)
139 {
140   double now;
141   int victim; 
142   /* int notaskpe = msg->from_pe; */
143   int mype = CmiMyPe();
144   int numpes = CmiNumPes();
145
146 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
147   now = CmiWallTimer();
148 #endif
149
150   do{
151       /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
152       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
153   } while(victim == mype);
154
155   /* reuse msg */
156 #if CMK_IMMEDIATE_MSG
157   /* fixme */
158   if (_steal_immediate) CmiBecomeImmediate(msg);
159 #endif
160   /*msg->to_rank = CmiRankOf(victim); */
161   msg->to_pe = victim;
162   msg->from_pe = mype;
163   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
164   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
165
166   CpvAccess(isStealing) = 1;
167
168 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
169   traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
170 #endif
171 }
172
173 void CldHandler(void *msg)
174 {
175   CldInfoFn ifn; CldPackFn pfn;
176   int len, queueing, priobits; unsigned int *prioptr;
177   
178   CldRestoreHandler(msg);
179   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
180   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
181   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
182   /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
183 }
184
185 #define CldPUTTOKEN(msg)  \
186            if (_steal_prio)   \
187              CldPutTokenPrio(msg);   \
188            else            \
189              CldPutToken(msg);
190
191 void CldBalanceHandler(void *msg)
192 {
193   CldRestoreHandler(msg);
194   CldPUTTOKEN(msg);
195   CpvAccess(isStealing) = 0;      /* fixme: this may not be right */
196 }
197
198 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
199 {
200   int len, queueing, priobits,i; unsigned int *prioptr;
201   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
202   CldPackFn pfn;
203   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
204   if (pfn) {
205     pfn(&msg);
206     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
207   }
208   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
209   CmiSetInfo(msg,infofn);
210
211   CmiSyncMulticastAndFree(grp, len, msg);
212 }
213
214 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
215 {
216   int len, queueing, priobits,i; unsigned int *prioptr;
217   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
218   CldPackFn pfn;
219   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
220   if (pfn) {
221     pfn(&msg);
222     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
223   }
224   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
225   CmiSetInfo(msg,infofn);
226   CmiSyncListSendAndFree(npes, pes, len, msg);
227 }
228
229 void CldEnqueue(int pe, void *msg, int infofn)
230 {
231   int len, queueing, priobits, avg; unsigned int *prioptr;
232   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
233   CldPackFn pfn;
234
235   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
236       pe = CmiMyPe();
237     /* always pack the message because the message may be move away
238        to a different processor later by CldGetToken() */
239     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
240     if (pfn && CmiNumNodes()>1) {
241        pfn(&msg);
242        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
243     }
244     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
245     CmiSetInfo(msg,infofn);
246     CldPUTTOKEN(msg);
247   } 
248   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
249     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
250     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
251   }
252   else {
253     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
254     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
255       pfn(&msg);
256       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
257     }
258     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
259     CmiSetInfo(msg,infofn);
260     if (pe==CLD_BROADCAST) 
261       CmiSyncBroadcastAndFree(len, msg);
262     else if (pe==CLD_BROADCAST_ALL)
263       CmiSyncBroadcastAllAndFree(len, msg);
264     else CmiSyncSendAndFree(pe, len, msg);
265   }
266 }
267
268 void CldNodeEnqueue(int node, void *msg, int infofn)
269 {
270   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
271   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
272   CldPackFn pfn;
273   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
274       pe = CmiMyPe();
275       node = CmiNodeOf(pe);
276       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
277       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
278   }
279   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
280     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
281     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
282   } 
283   else {
284     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
285     if (pfn) {
286         pfn(&msg);
287         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
288     }
289     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
290     CmiSetInfo(msg,infofn);
291     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
292     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
293     else CmiSyncNodeSendAndFree(node, len, msg);
294   }
295 }
296
297
298 void CldGraphModuleInit(char **argv)
299 {
300   CpvInitialize(CldProcInfo, CldData);
301   CpvInitialize(int, CldAskLoadHandlerIndex);
302   CpvInitialize(int, CldAckNoTaskHandlerIndex);
303   CpvInitialize(int, CldBalanceHandlerIndex);
304
305   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
306 #if CMK_TRACE_ENABLED
307   CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
308   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
309   CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
310 #endif
311
312   CpvAccess(CldBalanceHandlerIndex) = 
313     CmiRegisterHandler(CldBalanceHandler);
314   CpvAccess(CldAskLoadHandlerIndex) = 
315     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
316   
317   CpvAccess(CldAckNoTaskHandlerIndex) = 
318     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
319
320   /* communication thread */
321   if (CmiMyRank() == CmiMyNodeSize())  return;
322
323   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
324  
325   if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
326   {
327       CmiAssert(WS_Threshold>=0);
328   }
329
330   _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
331
332   _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
333
334   /* register idle handlers - when idle, keep asking work from neighbors */
335   if(CmiNumPes() > 1)
336     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
337       (CcdVoidFn) CldBeginIdle, NULL);
338   if(WS_Threshold >= 0 && CmiMyPe() == 0)
339       CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
340 #if CMK_IMMEDIATE_MSG
341   if(_steal_immediate && CmiMyPe() == 0)
342       CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
343 #endif
344 }
345
346
347 void CldModuleInit(char **argv)
348 {
349   CpvInitialize(int, CldHandlerIndex);
350   CpvInitialize(int, CldRelocatedMessages);
351   CpvInitialize(int, CldLoadBalanceMessages);
352   CpvInitialize(int, CldMessageChunks);
353   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
354   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
355   CpvAccess(CldMessageChunks) = 0;
356
357   CldModuleGeneralInit(argv);
358   CldGraphModuleInit(argv);
359
360   CpvAccess(CldLoadNotify) = 1;
361
362   CpvInitialize(int, isStealing);
363   CpvAccess(isStealing) = 0;
364 }
365
366 void CldCallback()
367 {}