change the formular in finding the next victim (get rid of notaskpe)
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25 int workstealingproactive = 0;
26
27 CpvStaticDeclare(CldProcInfo, CldData);
28 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
29 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
30 CpvStaticDeclare(int, isStealing);
31
32
33 char *CldGetStrategy(void)
34 {
35   return "work stealing";
36 }
37
38
39 static void StealLoad()
40 {
41   int i;
42   double startT;
43   requestmsg msg;
44   int myload;
45   int  victim;
46   int mype;
47   int numpes;
48
49   /* CcdRaiseCondition(CcdUSER); */
50
51   if (CpvAccess(isStealing)) return;    /* already stealing, return */
52   CpvAccess(isStealing) = 1;
53
54   myload = CldLoad();
55
56   mype = CmiMyPe();
57   msg.from_pe = mype;
58   numpes = CmiNumPes();
59   do{
60       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
61   }while(victim == mype);
62
63   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
64 #if IDLE_IMMEDIATE
65   /* fixme */
66   CmiBecomeImmediate(&msg);
67 #endif
68   msg.to_rank = CmiRankOf(victim);
69   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
70   
71 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
72   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
73 #endif
74 }
75
76 void LoadNotifyFn(int l)
77 {
78     if(workstealingproactive)
79     {
80         if(CldLoad() < 3)
81             StealLoad();
82     }
83 }
84 /* since I am idle, ask for work from neighbors */
85
86 static void CldBeginIdle(void *dummy)
87 {
88     StealLoad();
89
90 }
91 /* immediate message handler, work at node level */
92 /* send some work to requested proc */
93 static void CldAskLoadHandler(requestmsg *msg)
94 {
95   int receiver, rank, recvIdx, i;
96   int myload = CldLoad();
97
98   int sendLoad;
99   sendLoad = myload / 2; 
100   receiver = msg->from_pe;
101   /* only give you work if I have more than 1 */
102   if (myload>LOADTHRESH) {
103       if(_stealonly1) sendLoad = 1;
104       rank = CmiMyRank();
105       if (msg->to_rank != -1) rank = msg->to_rank;
106       CldMultipleSend(receiver, sendLoad, rank, 0);
107   }else
108   {
109       msg->from_pe = CmiMyPe();
110       msg->to_rank = CmiMyRank();
111
112       /* CcdRaiseCondition(CcdUSER); */
113
114       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
115       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
116     /* send ack indicating there is no task */
117   }
118 }
119
120 void  CldAckNoTaskHandler(requestmsg *msg)
121 {
122   int victim; 
123   int notaskpe = msg->from_pe;
124   int mype = CmiMyPe();
125
126   /* CcdRaiseCondition(CcdUSER); */
127
128   if (CmiMyPe()==2) victim = 2-mype;
129   else
130   do{
131       /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
132       victim = (((CrnRand())&0x7FFFFFFF)%CmiNumPes());
133   }while(victim == mype || victim == notaskpe);
134
135   /* reuse msg */
136   msg->to_rank = CmiRankOf(victim);
137   msg->from_pe = mype;
138   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
139   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
140
141   CpvAccess(isStealing) = 1;
142 }
143
144 void CldHandler(void *msg)
145 {
146   CldInfoFn ifn; CldPackFn pfn;
147   int len, queueing, priobits; unsigned int *prioptr;
148   
149   CldRestoreHandler(msg);
150   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
151   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
152   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
153 }
154
155 void CldBalanceHandler(void *msg)
156 {
157   CldRestoreHandler(msg);
158   CldPutToken(msg);
159   CpvAccess(isStealing) = 0;
160 }
161
162 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
163 {
164   int len, queueing, priobits,i; unsigned int *prioptr;
165   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
166   CldPackFn pfn;
167   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
168   if (pfn) {
169     pfn(&msg);
170     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
171   }
172   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
173   CmiSetInfo(msg,infofn);
174
175   CmiSyncMulticastAndFree(grp, len, msg);
176 }
177
178 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
179 {
180   int len, queueing, priobits,i; unsigned int *prioptr;
181   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
182   CldPackFn pfn;
183   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
184   if (pfn) {
185     pfn(&msg);
186     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
187   }
188   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
189   CmiSetInfo(msg,infofn);
190   CmiSyncListSendAndFree(npes, pes, len, msg);
191 }
192
193 void CldEnqueue(int pe, void *msg, int infofn)
194 {
195   int len, queueing, priobits, avg; unsigned int *prioptr;
196   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
197   CldPackFn pfn;
198
199   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
200       pe = CmiMyPe();
201     /* always pack the message because the message may be move away
202        to a different processor later by CldGetToken() */
203     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
204     if (pfn && CmiNumNodes()>1) {
205        pfn(&msg);
206        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
207     }
208     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
209     CmiSetInfo(msg,infofn);
210     CldPutToken(msg);
211   } 
212   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
213     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
214     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
215   }
216   else {
217     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
218     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
219       pfn(&msg);
220       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
221     }
222     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
223     CmiSetInfo(msg,infofn);
224     if (pe==CLD_BROADCAST) 
225       CmiSyncBroadcastAndFree(len, msg);
226     else if (pe==CLD_BROADCAST_ALL)
227       CmiSyncBroadcastAllAndFree(len, msg);
228     else CmiSyncSendAndFree(pe, len, msg);
229   }
230 }
231
232 void CldNodeEnqueue(int node, void *msg, int infofn)
233 {
234   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
235   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
236   CldPackFn pfn;
237   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
238       pe = CmiMyPe();
239       node = CmiNodeOf(pe);
240       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
241       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
242   }
243   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
244     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
245     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
246   } 
247   else {
248     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
249     if (pfn) {
250         pfn(&msg);
251         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
252     }
253     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
254     CmiSetInfo(msg,infofn);
255     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
256     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
257     else CmiSyncNodeSendAndFree(node, len, msg);
258   }
259 }
260
261
262 void CldGraphModuleInit(char **argv)
263 {
264   CpvInitialize(CldProcInfo, CldData);
265   CpvInitialize(int, CldAskLoadHandlerIndex);
266   CpvInitialize(int, CldAckNoTaskHandlerIndex);
267   CpvInitialize(int, CldBalanceHandlerIndex);
268
269   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
270 #if CMK_TRACE_ENABLED
271   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
272   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
273   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
274 #endif
275
276   CpvAccess(CldBalanceHandlerIndex) = 
277     CmiRegisterHandler(CldBalanceHandler);
278   CpvAccess(CldAskLoadHandlerIndex) = 
279     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
280   
281   CpvAccess(CldAckNoTaskHandlerIndex) = 
282     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
283
284   /* communication thread */
285   if (CmiMyRank() == CmiMyNodeSize())  return;
286
287   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
288   
289   workstealingproactive= CmiGetArgFlagDesc(argv, "+workstealingproactive", "Charm++> Work Stealing, steal before going idle(threshold = 3)");
290
291   /* register idle handlers - when idle, keep asking work from neighbors */
292   if(CmiNumPes() > 1)
293     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
294       (CcdVoidFn) CldBeginIdle, NULL);
295   if (CmiMyPe() == 0) 
296       CmiPrintf("Charm++> Work stealing is enabled. \n");
297   if(workstealingproactive && CmiMyPe() == 0)
298       CmiPrintf("Charm++> Steal work when load is fewer than 3. \n");
299 }
300
301
302 void CldModuleInit(char **argv)
303 {
304   CpvInitialize(int, CldHandlerIndex);
305   CpvInitialize(int, CldRelocatedMessages);
306   CpvInitialize(int, CldLoadBalanceMessages);
307   CpvInitialize(int, CldMessageChunks);
308   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
309   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
310   CpvAccess(CldMessageChunks) = 0;
311
312   CldModuleGeneralInit(argv);
313   CldGraphModuleInit(argv);
314
315   CpvAccess(CldLoadNotify) = 1;
316
317   CpvInitialize(int, isStealing);
318   CpvAccess(isStealing) = 0;
319 }
320
321 void CldCallback()
322 {}