build: fix travis MPI/SMP build
[charm.git] / src / conv-ldb / cldb.workstealing.C
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define TRACE_USEREVENTS        1
9 #define LOADTHRESH              3
10
11 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed);
12
13 typedef struct CldProcInfo_s {
14   int    askEvt;                /* user event for askLoad */
15   int    askNoEvt;              /* user event for askNoLoad */
16   int    idleEvt;               /* user event for idle balancing */
17 } *CldProcInfo;
18
19 static int WS_Threshold = -1;
20 static int _steal_prio = 0;
21 static int _stealonly1 = 0;
22 static int _steal_immediate = 0;
23 static int workstealingproactive = 0;
24
25 CpvStaticDeclare(CldProcInfo, CldData);
26 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
27 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
28 CpvStaticDeclare(int, isStealing);
29
30
31 const char *CldGetStrategy(void)
32 {
33   return "work stealing";
34 }
35
36
37 static void StealLoad(void)
38 {
39   int i;
40   double now;
41   requestmsg msg;
42   int  victim;
43   int mype;
44   int numpes;
45
46   if (CpvAccess(isStealing)) return;    /* already stealing, return */
47
48   CpvAccess(isStealing) = 1;
49
50 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
51   now = CmiWallTimer();
52 #endif
53
54   mype = CmiMyPe();
55   msg.from_pe = mype;
56   numpes = CmiNumPes();
57   do{
58       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
59   }while(victim == mype);
60
61   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
62 #if CMK_IMMEDIATE_MSG
63   /* fixme */
64   if (_steal_immediate) CmiBecomeImmediate(&msg);
65 #endif
66   /* msg.to_rank = CmiRankOf(victim); */
67   msg.to_pe = victim;
68   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
69   
70 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
71   traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
72 #endif
73 }
74
75 void LoadNotifyFn(int l)
76 {
77     if(CldCountTokens() <= WS_Threshold)
78         StealLoad();
79 }
80
81 /* since I am idle, ask for work from neighbors */
82 static void CldBeginIdle(void *dummy)
83 {
84     //if (CldCountTokens() == 0) StealLoad();
85     CmiAssert(CldCountTokens()==0);
86     StealLoad();
87 }
88
89 /* immediate message handler, work at node level */
90 /* send some work to requested proc */
91 static void CldAskLoadHandler(requestmsg *msg)
92 {
93   int receiver, rank, recvIdx, i;
94   int myload, sendLoad;
95   double now;
96   /* int myload = CldLoad(); */
97
98 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
99   now = CmiWallTimer();
100 #endif
101
102   /* rank = msg->to_rank; */
103   CmiAssert(msg->to_pe!=-1);
104   rank = CmiRankOf(msg->to_pe);
105   CmiAssert(rank!=-1);
106   myload = CldCountTokensRank(rank);
107
108   receiver = msg->from_pe;
109   /* only give you work if I have more than 1 */
110   if (myload>LOADTHRESH) {
111       if(_stealonly1) sendLoad = 1;
112       else sendLoad = myload/2; 
113       if(sendLoad > 0) {
114 #if ! CMK_USE_IBVERBS
115         if (_steal_prio)
116           CldMultipleSendPrio(receiver, sendLoad, rank, 0);
117         else
118           CldMultipleSend(receiver, sendLoad, rank, 0);
119 #else
120           CldSimpleMultipleSend(receiver, sendLoad, rank);
121 #endif
122       }
123       CmiFree(msg);
124   }else
125   {     /* send ack indicating there is no task */
126       int pe = msg->to_pe;
127       msg->to_pe = msg->from_pe;
128       msg->from_pe = pe;
129       /*msg->to_rank = CmiMyRank(); */
130
131       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
132       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
133   }
134
135 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
136   traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
137 #endif
138 }
139
140 void  CldAckNoTaskHandler(requestmsg *msg)
141 {
142   double now;
143   int victim; 
144   /* int notaskpe = msg->from_pe; */
145   int mype = CmiMyPe();
146   int numpes = CmiNumPes();
147
148 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
149   now = CmiWallTimer();
150 #endif
151
152   do{
153       /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
154       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
155   } while(victim == mype);
156
157   /* reuse msg */
158 #if CMK_IMMEDIATE_MSG
159   /* fixme */
160   if (_steal_immediate) CmiBecomeImmediate(msg);
161 #endif
162   /*msg->to_rank = CmiRankOf(victim); */
163   msg->to_pe = victim;
164   msg->from_pe = mype;
165   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
166   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
167
168   CpvAccess(isStealing) = 1;
169
170 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
171   traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
172 #endif
173 }
174
175 void CldHandler(void *msg)
176 {
177   CldInfoFn ifn; CldPackFn pfn;
178   int len, queueing, priobits; unsigned int *prioptr;
179   
180   CldRestoreHandler((char *)msg);
181   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
182   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
183   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
184   /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
185 }
186
187 #define CldPUTTOKEN(msg)  \
188            if (_steal_prio)   \
189              CldPutTokenPrio(msg);   \
190            else            \
191              CldPutToken(msg);
192
193 void CldBalanceHandler(void *msg)
194 {
195   CldRestoreHandler((char *)msg);
196   CldPUTTOKEN((char *)msg);
197   CpvAccess(isStealing) = 0;      /* fixme: this may not be right */
198 }
199
200 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
201 {
202   int len, queueing, priobits,i; unsigned int *prioptr;
203   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
204   CldPackFn pfn;
205   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
206   if (pfn) {
207     pfn(&msg);
208     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
209   }
210   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
211   CmiSetInfo(msg,infofn);
212
213   CmiSyncMulticastAndFree(grp, len, msg);
214 }
215
216 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
217 {
218   int len, queueing, priobits,i; unsigned int *prioptr;
219   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
220   CldPackFn pfn;
221   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
222   if (pfn) {
223     pfn(&msg);
224     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
225   }
226   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
227   CmiSetInfo(msg,infofn);
228   CmiSyncListSendAndFree(npes, pes, len, msg);
229 }
230
231 void CldEnqueue(int pe, void *msg, int infofn)
232 {
233   int len, queueing, priobits, avg; unsigned int *prioptr;
234   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
235   CldPackFn pfn;
236
237   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
238       pe = CmiMyPe();
239     /* always pack the message because the message may be move away
240        to a different processor later by CldGetToken() */
241     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
242     if (pfn && CmiNumNodes()>1) {
243        pfn(&msg);
244        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
245     }
246     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
247     CmiSetInfo(msg,infofn);
248     CldPUTTOKEN((char *)msg);
249   } 
250   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
251     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
252     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
253   }
254   else {
255     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
256     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
257       pfn(&msg);
258       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
259     }
260     CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
261     CmiSetInfo(msg,infofn);
262     if (pe==CLD_BROADCAST) 
263       CmiSyncBroadcastAndFree(len, msg);
264     else if (pe==CLD_BROADCAST_ALL)
265       CmiSyncBroadcastAllAndFree(len, msg);
266     else CmiSyncSendAndFree(pe, len, msg);
267   }
268 }
269
270 void CldNodeEnqueue(int node, void *msg, int infofn)
271 {
272   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
273   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
274   CldPackFn pfn;
275   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
276       pe = CmiMyPe();
277       node = CmiNodeOf(pe);
278       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
279       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
280   }
281   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
282     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
283     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
284   } 
285   else {
286     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
287     if (pfn) {
288         pfn(&msg);
289         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
290     }
291     CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
292     CmiSetInfo(msg,infofn);
293     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
294     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
295     else CmiSyncNodeSendAndFree(node, len, msg);
296   }
297 }
298
299
300 void CldGraphModuleInit(char **argv)
301 {
302   CpvInitialize(CldProcInfo, CldData);
303   CpvInitialize(int, CldAskLoadHandlerIndex);
304   CpvInitialize(int, CldAckNoTaskHandlerIndex);
305   CpvInitialize(int, CldBalanceHandlerIndex);
306
307   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
308 #if CMK_TRACE_ENABLED
309   CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
310   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
311   CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
312 #endif
313
314   CpvAccess(CldBalanceHandlerIndex) = 
315     CmiRegisterHandler(CldBalanceHandler);
316   CpvAccess(CldAskLoadHandlerIndex) = 
317     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
318   
319   CpvAccess(CldAckNoTaskHandlerIndex) = 
320     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
321
322   /* communication thread */
323   if (CmiMyRank() == CmiMyNodeSize())  return;
324
325   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
326  
327   if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
328   {
329       CmiAssert(WS_Threshold>=0);
330   }
331
332   _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
333
334   _steal_prio = CmiGetArgFlagDesc(argv, "+WSPriority", "Charm++> Work Stealing, using priority");
335
336   /* register idle handlers - when idle, keep asking work from neighbors */
337   if(CmiNumPes() > 1)
338     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
339       (CcdVoidFn) CldBeginIdle, NULL);
340   if(WS_Threshold >= 0 && CmiMyPe() == 0)
341       CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
342 #if CMK_IMMEDIATE_MSG
343   if(_steal_immediate && CmiMyPe() == 0)
344       CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
345 #endif
346 }
347
348
349 void CldModuleInit(char **argv)
350 {
351   CpvInitialize(int, CldHandlerIndex);
352   CpvInitialize(int, CldRelocatedMessages);
353   CpvInitialize(int, CldLoadBalanceMessages);
354   CpvInitialize(int, CldMessageChunks);
355   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
356   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
357   CpvAccess(CldMessageChunks) = 0;
358
359   CldModuleGeneralInit(argv);
360   CldGraphModuleInit(argv);
361
362   CpvAccess(CldLoadNotify) = 1;
363
364   CpvInitialize(int, isStealing);
365   CpvAccess(isStealing) = 0;
366 }
367
368 void CldCallback(void)
369 {}