add workstealing proactive option to make stealing more proactive
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25 int workstealingproactive = 0;
26
27 CpvStaticDeclare(CldProcInfo, CldData);
28 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
29 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
30 CpvStaticDeclare(int, isStealing);
31
32
33 char *CldGetStrategy(void)
34 {
35   return "work stealing";
36 }
37
38 void LoadNotifyFn(int l)
39 {
40     if(workstealingproactive)
41     {
42         if(CldLoad() < 3)
43             StealLoad();
44     }
45 }
46 /* since I am idle, ask for work from neighbors */
47
48 static void CldBeginIdle(void *dummy)
49 {
50     StealLoad();
51
52 }
53 static inline void StealLoad()
54 {
55   int i;
56   double startT;
57   requestmsg msg;
58   int myload;
59   int  victim;
60   int mype;
61   int numpes;
62
63   /* CcdRaiseCondition(CcdUSER); */
64
65   if (CpvAccess(isStealing)) return;    /* already stealing, return */
66   CpvAccess(isStealing) = 1;
67
68   myload = CldLoad();
69
70   mype = CmiMyPe();
71   msg.from_pe = mype;
72   numpes = CmiNumPes();
73   do{
74       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
75   }while(victim == mype);
76
77   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
78 #if IDLE_IMMEDIATE
79   /* fixme */
80   CmiBecomeImmediate(&msg);
81 #endif
82   msg.to_rank = CmiRankOf(victim);
83   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
84   
85 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
86   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
87 #endif
88 }
89
90 /* immediate message handler, work at node level */
91 /* send some work to requested proc */
92 static void CldAskLoadHandler(requestmsg *msg)
93 {
94   int receiver, rank, recvIdx, i;
95   int myload = CldLoad();
96
97   int sendLoad;
98   sendLoad = myload / 2; 
99   receiver = msg->from_pe;
100   /* only give you work if I have more than 1 */
101   if (myload>LOADTHRESH) {
102       if(_stealonly1) sendLoad = 1;
103       rank = CmiMyRank();
104       if (msg->to_rank != -1) rank = msg->to_rank;
105       CldMultipleSend(receiver, sendLoad, rank, 0);
106   }else
107   {
108       msg->from_pe = CmiMyPe();
109       msg->to_rank = CmiMyRank();
110
111       /* CcdRaiseCondition(CcdUSER); */
112
113       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
114       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
115     /* send ack indicating there is no task */
116   }
117 }
118
119 void  CldAckNoTaskHandler(requestmsg *msg)
120 {
121   int victim; 
122   int notaskpe = msg->from_pe;
123   int mype = CmiMyPe();
124
125   /* CcdRaiseCondition(CcdUSER); */
126
127   do{
128       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
129   }while(victim == mype);
130
131   /* reuse msg */
132   msg->to_rank = CmiRankOf(victim);
133   msg->from_pe = mype;
134   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
135   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
136
137   CpvAccess(isStealing) = 1;
138 }
139
140 void CldHandler(void *msg)
141 {
142   CldInfoFn ifn; CldPackFn pfn;
143   int len, queueing, priobits; unsigned int *prioptr;
144   
145   CldRestoreHandler(msg);
146   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
147   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
148   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
149 }
150
151 void CldBalanceHandler(void *msg)
152 {
153   CldRestoreHandler(msg);
154   CldPutToken(msg);
155   CpvAccess(isStealing) = 0;
156 }
157
158 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
159 {
160   int len, queueing, priobits,i; unsigned int *prioptr;
161   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
162   CldPackFn pfn;
163   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
164   if (pfn) {
165     pfn(&msg);
166     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
167   }
168   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
169   CmiSetInfo(msg,infofn);
170
171   CmiSyncMulticastAndFree(grp, len, msg);
172 }
173
174 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
175 {
176   int len, queueing, priobits,i; unsigned int *prioptr;
177   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
178   CldPackFn pfn;
179   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
180   if (pfn) {
181     pfn(&msg);
182     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
183   }
184   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
185   CmiSetInfo(msg,infofn);
186   CmiSyncListSendAndFree(npes, pes, len, msg);
187 }
188
189 void CldEnqueue(int pe, void *msg, int infofn)
190 {
191   int len, queueing, priobits, avg; unsigned int *prioptr;
192   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
193   CldPackFn pfn;
194
195   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
196       pe = CmiMyPe();
197     /* always pack the message because the message may be move away
198        to a different processor later by CldGetToken() */
199     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
200     if (pfn && CmiNumNodes()>1) {
201        pfn(&msg);
202        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
203     }
204     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
205     CmiSetInfo(msg,infofn);
206     CldPutToken(msg);
207   } 
208   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
209     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
210     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
211   }
212   else {
213     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
214     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
215       pfn(&msg);
216       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
217     }
218     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
219     CmiSetInfo(msg,infofn);
220     if (pe==CLD_BROADCAST) 
221       CmiSyncBroadcastAndFree(len, msg);
222     else if (pe==CLD_BROADCAST_ALL)
223       CmiSyncBroadcastAllAndFree(len, msg);
224     else CmiSyncSendAndFree(pe, len, msg);
225   }
226 }
227
228 void CldNodeEnqueue(int node, void *msg, int infofn)
229 {
230   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
231   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
232   CldPackFn pfn;
233   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
234       pe = CmiMyPe();
235       node = CmiNodeOf(pe);
236       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
237       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
238   }
239   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
240     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
241     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
242   } 
243   else {
244     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
245     if (pfn) {
246         pfn(&msg);
247         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
248     }
249     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
250     CmiSetInfo(msg,infofn);
251     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
252     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
253     else CmiSyncNodeSendAndFree(node, len, msg);
254   }
255 }
256
257
258 void CldGraphModuleInit(char **argv)
259 {
260   CpvInitialize(CldProcInfo, CldData);
261   CpvInitialize(int, CldAskLoadHandlerIndex);
262   CpvInitialize(int, CldAckNoTaskHandlerIndex);
263   CpvInitialize(int, CldBalanceHandlerIndex);
264
265   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
266 #if CMK_TRACE_ENABLED
267   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
268   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
269   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
270 #endif
271
272   CpvAccess(CldBalanceHandlerIndex) = 
273     CmiRegisterHandler(CldBalanceHandler);
274   CpvAccess(CldAskLoadHandlerIndex) = 
275     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
276   
277   CpvAccess(CldAckNoTaskHandlerIndex) = 
278     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
279
280   /* communication thread */
281   if (CmiMyRank() == CmiMyNodeSize())  return;
282
283   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
284   
285   workstealingproactive= CmiGetArgFlagDesc(argv, "+workstealingproactive", "Charm++> Work Stealing, steal before going idle(threshold = 3)");
286
287   /* register idle handlers - when idle, keep asking work from neighbors */
288   if(CmiNumPes() > 1)
289   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
290       (CcdVoidFn) CldBeginIdle, NULL);
291   if (CmiMyPe() == 0) 
292       CmiPrintf("Charm++> Work stealing is enabled. \n");
293 }
294
295
296 void CldModuleInit(char **argv)
297 {
298   CpvInitialize(int, CldHandlerIndex);
299   CpvInitialize(int, CldRelocatedMessages);
300   CpvInitialize(int, CldLoadBalanceMessages);
301   CpvInitialize(int, CldMessageChunks);
302   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
303   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
304   CpvAccess(CldMessageChunks) = 0;
305
306   CldModuleGeneralInit(argv);
307   CldGraphModuleInit(argv);
308
309   CpvAccess(CldLoadNotify) = 1;
310
311   CpvInitialize(int, isStealing);
312   CpvAccess(isStealing) = 0;
313 }
314
315 void CldCallback()
316 {}