changed for immediate msg
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define TRACE_USEREVENTS        1
9 #define LOADTHRESH              3
10
11 typedef struct CldProcInfo_s {
12   int    askEvt;                /* user event for askLoad */
13   int    askNoEvt;              /* user event for askNoLoad */
14   int    idleEvt;               /* user event for idle balancing */
15 } *CldProcInfo;
16
17 static int WS_Threshold = -1;
18 static int _stealonly1 = 0;
19 static int _steal_immediate = 0;
20 static int workstealingproactive = 0;
21
22 CpvStaticDeclare(CldProcInfo, CldData);
23 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
24 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
25 CpvStaticDeclare(int, isStealing);
26
27
28 char *CldGetStrategy(void)
29 {
30   return "work stealing";
31 }
32
33
34 static void StealLoad()
35 {
36   int i;
37   double now;
38   requestmsg msg;
39   int  victim;
40   int mype;
41   int numpes;
42
43   if (CpvAccess(isStealing)) return;    /* already stealing, return */
44
45   CpvAccess(isStealing) = 1;
46
47 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
48   now = CmiWallTimer();
49 #endif
50
51   mype = CmiMyPe();
52   msg.from_pe = mype;
53   numpes = CmiNumPes();
54   do{
55       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
56   }while(victim == mype);
57
58   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
59 #if CMK_IMMEDIATE_MSG
60   /* fixme */
61   if (_steal_immediate) CmiBecomeImmediate(&msg);
62 #endif
63   /* msg.to_rank = CmiRankOf(victim); */
64   msg.to_pe = victim;
65   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
66   
67 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
68   traceUserBracketEvent(CpvAccess(CldData)->idleEvt, now, CmiWallTimer());
69 #endif
70 }
71
72 void LoadNotifyFn(int l)
73 {
74     if(CldCountTokens() <= WS_Threshold)
75         StealLoad();
76 }
77
78 /* since I am idle, ask for work from neighbors */
79 static void CldBeginIdle(void *dummy)
80 {
81     //if (CldCountTokens() == 0) StealLoad();
82     CmiAssert(CldCountTokens()==0);
83     StealLoad();
84 }
85
86 /* immediate message handler, work at node level */
87 /* send some work to requested proc */
88 static void CldAskLoadHandler(requestmsg *msg)
89 {
90   int receiver, rank, recvIdx, i;
91   int myload, sendLoad;
92   double now;
93   /* int myload = CldLoad(); */
94
95 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
96   now = CmiWallTimer();
97 #endif
98
99   /* rank = msg->to_rank; */
100   CmiAssert(msg->to_pe!=-1);
101   rank = CmiRankOf(msg->to_pe);
102   CmiAssert(rank!=-1);
103   myload = CldCountTokensRank(rank);
104
105   receiver = msg->from_pe;
106   /* only give you work if I have more than 1 */
107   if (myload>LOADTHRESH) {
108       if(_stealonly1) sendLoad = 1;
109       else sendLoad = myload/2; 
110       if(sendLoad > 0)
111           CldMultipleSend(receiver, sendLoad, rank, 0);
112       CmiFree(msg);
113   }else
114   {     /* send ack indicating there is no task */
115       int pe = msg->to_pe;
116       msg->to_pe = msg->from_pe;
117       msg->from_pe = pe;
118       /*msg->to_rank = CmiMyRank(); */
119
120       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
121       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
122   }
123
124 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
125   traceUserBracketEvent(CpvAccess(CldData)->askEvt, now, CmiWallTimer());
126 #endif
127 }
128
129 void  CldAckNoTaskHandler(requestmsg *msg)
130 {
131   double now;
132   int victim; 
133   /* int notaskpe = msg->from_pe; */
134   int mype = CmiMyPe();
135   int numpes = CmiNumPes();
136
137 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
138   now = CmiWallTimer();
139 #endif
140
141   do{
142       /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
143       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
144   } while(victim == mype);
145
146   /* reuse msg */
147 #if CMK_IMMEDIATE_MSG
148   /* fixme */
149   if (_steal_immediate) CmiBecomeImmediate(msg);
150 #endif
151   /*msg->to_rank = CmiRankOf(victim); */
152   msg->to_pe = victim;
153   msg->from_pe = mype;
154   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
155   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
156
157   CpvAccess(isStealing) = 1;
158
159 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
160   traceUserBracketEvent(CpvAccess(CldData)->askNoEvt, now, CmiWallTimer());
161 #endif
162 }
163
164 void CldHandler(void *msg)
165 {
166   CldInfoFn ifn; CldPackFn pfn;
167   int len, queueing, priobits; unsigned int *prioptr;
168   
169   CldRestoreHandler(msg);
170   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
171   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
172   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
173   /* CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
174 }
175
176 void CldBalanceHandler(void *msg)
177 {
178   CldRestoreHandler(msg);
179   CldPutToken(msg);
180   CpvAccess(isStealing) = 0;      /* fixme: this may not be right */
181 }
182
183 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
184 {
185   int len, queueing, priobits,i; unsigned int *prioptr;
186   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
187   CldPackFn pfn;
188   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
189   if (pfn) {
190     pfn(&msg);
191     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
192   }
193   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
194   CmiSetInfo(msg,infofn);
195
196   CmiSyncMulticastAndFree(grp, len, msg);
197 }
198
199 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
200 {
201   int len, queueing, priobits,i; unsigned int *prioptr;
202   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
203   CldPackFn pfn;
204   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
205   if (pfn) {
206     pfn(&msg);
207     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
208   }
209   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
210   CmiSetInfo(msg,infofn);
211   CmiSyncListSendAndFree(npes, pes, len, msg);
212 }
213
214 void CldEnqueue(int pe, void *msg, int infofn)
215 {
216   int len, queueing, priobits, avg; unsigned int *prioptr;
217   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
218   CldPackFn pfn;
219
220   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
221       pe = CmiMyPe();
222     /* always pack the message because the message may be move away
223        to a different processor later by CldGetToken() */
224     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
225     if (pfn && CmiNumNodes()>1) {
226        pfn(&msg);
227        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
228     }
229     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
230     CmiSetInfo(msg,infofn);
231     CldPutToken(msg);
232   } 
233   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
234     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
235     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
236   }
237   else {
238     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
239     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
240       pfn(&msg);
241       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
242     }
243     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
244     CmiSetInfo(msg,infofn);
245     if (pe==CLD_BROADCAST) 
246       CmiSyncBroadcastAndFree(len, msg);
247     else if (pe==CLD_BROADCAST_ALL)
248       CmiSyncBroadcastAllAndFree(len, msg);
249     else CmiSyncSendAndFree(pe, len, msg);
250   }
251 }
252
253 void CldNodeEnqueue(int node, void *msg, int infofn)
254 {
255   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
256   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
257   CldPackFn pfn;
258   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
259       pe = CmiMyPe();
260       node = CmiNodeOf(pe);
261       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
262       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
263   }
264   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
265     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
266     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
267   } 
268   else {
269     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
270     if (pfn) {
271         pfn(&msg);
272         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
273     }
274     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
275     CmiSetInfo(msg,infofn);
276     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
277     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
278     else CmiSyncNodeSendAndFree(node, len, msg);
279   }
280 }
281
282
283 void CldGraphModuleInit(char **argv)
284 {
285   CpvInitialize(CldProcInfo, CldData);
286   CpvInitialize(int, CldAskLoadHandlerIndex);
287   CpvInitialize(int, CldAckNoTaskHandlerIndex);
288   CpvInitialize(int, CldBalanceHandlerIndex);
289
290   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
291 #if CMK_TRACE_ENABLED
292   CpvAccess(CldData)->askEvt = traceRegisterUserEvent("CldAskLoad", -1);
293   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("StealLoad", -1);
294   CpvAccess(CldData)->askNoEvt = traceRegisterUserEvent("CldAckNoTask", -1);
295 #endif
296
297   CpvAccess(CldBalanceHandlerIndex) = 
298     CmiRegisterHandler(CldBalanceHandler);
299   CpvAccess(CldAskLoadHandlerIndex) = 
300     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
301   
302   CpvAccess(CldAckNoTaskHandlerIndex) = 
303     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
304
305   /* communication thread */
306   if (CmiMyRank() == CmiMyNodeSize())  return;
307
308   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
309  
310   if(CmiGetArgIntDesc(argv, "+WSThreshold", &WS_Threshold, "The number of minimum load before stealing"))
311   {
312       CmiAssert(WS_Threshold>=0);
313   }
314
315   _steal_immediate = CmiGetArgFlagDesc(argv, "+WSImmediate", "Charm++> Work Stealing, steal using immediate messages");
316
317   /* register idle handlers - when idle, keep asking work from neighbors */
318   if(CmiNumPes() > 1)
319     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
320       (CcdVoidFn) CldBeginIdle, NULL);
321   if(WS_Threshold >= 0 && CmiMyPe() == 0)
322       CmiPrintf("Charm++> Steal work when load is fewer than %d. \n", WS_Threshold);
323 #if CMK_IMMEDIATE_MSG
324   if(_steal_immediate && CmiMyPe() == 0)
325       CmiPrintf("Charm++> Steal work using immediate messages. \n", WS_Threshold);
326 #endif
327 }
328
329
330 void CldModuleInit(char **argv)
331 {
332   CpvInitialize(int, CldHandlerIndex);
333   CpvInitialize(int, CldRelocatedMessages);
334   CpvInitialize(int, CldLoadBalanceMessages);
335   CpvInitialize(int, CldMessageChunks);
336   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
337   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
338   CpvAccess(CldMessageChunks) = 0;
339
340   CldModuleGeneralInit(argv);
341   CldGraphModuleInit(argv);
342
343   CpvAccess(CldLoadNotify) = 1;
344
345   CpvInitialize(int, isStealing);
346   CpvAccess(isStealing) = 0;
347 }
348
349 void CldCallback()
350 {}