fix a bug in seed load balancer about begin idle
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   double lastCheck;
20   int    sent;                  /* flag to disable idle work request */
21   int    balanceEvt;            /* user event for balancing */
22   int    idleEvt;               /* user event for idle balancing */
23   int    idleprocEvt;           /* user event for processing idle req */
24   double lastBalanceTime;
25 } *CldProcInfo;
26
27 int _stealonly1 = 0;
28
29 CpvStaticDeclare(CldProcInfo, CldData);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
32
33 void LoadNotifyFn(int l)
34 {
35   CldProcInfo  cldData = CpvAccess(CldData);
36   cldData->sent = 0;
37 }
38
39 char *CldGetStrategy(void)
40 {
41   return "work stealing";
42 }
43
44 /* since I am idle, ask for work from neighbors */
45
46
47 static void CldBeginIdle(void *dummy)
48 {
49   int i;
50   double startT;
51   requestmsg msg;
52   int myload;
53   CldProcInfo  cldData = CpvAccess(CldData);
54   int  victim;
55   int mype;
56   int numpes;
57
58   myload = CldLoad();
59   if (myload > 0) return;
60
61   mype = CmiMyPe();
62   msg.from_pe = mype;
63   numpes = CmiNumPes();
64   do{
65       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
66   }while(victim == mype);
67
68   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
69 #if IDLE_IMMEDIATE
70   /* fixme */
71   CmiBecomeImmediate(&msg);
72 #endif
73   msg.to_rank = CmiRankOf(victim);
74   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
75
76 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
77   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
78 #endif
79 }
80
81 /* immediate message handler, work at node level */
82 /* send some work to requested proc */
83 static void CldAskLoadHandler(requestmsg *msg)
84 {
85   int receiver, rank, recvIdx, i;
86   int myload = CldLoad();
87   double now = CmiWallTimer();
88   CldProcInfo  cldData = CpvAccess(CldData);
89
90   int sendLoad;
91   sendLoad = myload / 2; 
92   receiver = msg->from_pe;
93   /* only give you work if I have more than 1 */
94   if (myload>LOADTHRESH) {
95       if(_stealonly1) sendLoad = 1;
96       rank = CmiMyRank();
97       if (msg->to_rank != -1) rank = msg->to_rank;
98       CldMultipleSend(receiver, sendLoad, rank, 0);
99   }else
100   {
101       requestmsg r_msg;
102       r_msg.from_pe = CmiMyPe();
103       r_msg.to_rank = CmiMyRank();
104
105       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
106       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
107     /* send ack indicating there is no task */
108   }
109   CmiFree(msg);
110 }
111
112 void  CldAckNoTaskHandler(requestmsg *msg)
113 {
114     int victim; 
115     requestmsg r_msg;
116     int notaskpe = msg->from_pe;
117     CldProcInfo  cldData = CpvAccess(CldData);
118     int mype = CmiMyPe();
119   do{
120       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
121   }while(victim == mype || victim == notaskpe);
122
123   /* fixme */
124   //CmiBecomeImmediate(&msg);
125   r_msg.to_rank = CmiRankOf(victim);
126   r_msg.from_pe = mype;
127   CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
128   CmiSyncSend(victim, sizeof(requestmsg),(char *)&r_msg);
129   cldData->sent = 1;
130
131   cldData->lastCheck = CmiWallTimer();
132
133   CmiFree(msg);
134
135 }
136 void CldHandler(void *msg)
137 {
138   CldInfoFn ifn; CldPackFn pfn;
139   int len, queueing, priobits; unsigned int *prioptr;
140   
141   CldRestoreHandler(msg);
142   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
143   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
144   /*CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
145   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
146 }
147
148 void CldBalanceHandler(void *msg)
149 {
150   CldRestoreHandler(msg);
151   CldPutToken(msg);
152 }
153
154 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
155 {
156   int len, queueing, priobits,i; unsigned int *prioptr;
157   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
158   CldPackFn pfn;
159   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
160   if (pfn) {
161     pfn(&msg);
162     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
163   }
164   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
165   CmiSetInfo(msg,infofn);
166
167   CmiSyncMulticastAndFree(grp, len, msg);
168 }
169
170 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
171 {
172   int len, queueing, priobits,i; unsigned int *prioptr;
173   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
174   CldPackFn pfn;
175   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
176   if (pfn) {
177     pfn(&msg);
178     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
179   }
180   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
181   CmiSetInfo(msg,infofn);
182   CmiSyncListSendAndFree(npes, pes, len, msg);
183 }
184
185 void CldEnqueue(int pe, void *msg, int infofn)
186 {
187   int len, queueing, priobits, avg; unsigned int *prioptr;
188   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
189   CldPackFn pfn;
190
191   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
192       pe = CmiMyPe();
193     /* always pack the message because the message may be move away
194        to a different processor later by CldGetToken() */
195     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
196     if (pfn && CmiNumNodes()>1) {
197        pfn(&msg);
198        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
199     }
200     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
201     CmiSetInfo(msg,infofn);
202     CldPutToken(msg);
203   } 
204   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
205     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
206     //CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
207     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
208   }
209   else {
210     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
211     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
212       pfn(&msg);
213       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
214     }
215     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
216     CmiSetInfo(msg,infofn);
217     if (pe==CLD_BROADCAST) 
218       CmiSyncBroadcastAndFree(len, msg);
219     else if (pe==CLD_BROADCAST_ALL)
220       CmiSyncBroadcastAllAndFree(len, msg);
221     else CmiSyncSendAndFree(pe, len, msg);
222   }
223 }
224
225 void CldNodeEnqueue(int node, void *msg, int infofn)
226 {
227   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
228   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
229   CldPackFn pfn;
230   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
231       pe = CmiMyPe();
232       node = CmiNodeOf(pe);
233       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
234       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
235   }
236   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
237     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
238     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
239   } 
240   else {
241     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
242     if (pfn) {
243         pfn(&msg);
244         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
245     }
246     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
247     CmiSetInfo(msg,infofn);
248     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
249     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
250     else CmiSyncNodeSendAndFree(node, len, msg);
251   }
252 }
253
254
255 void CldGraphModuleInit(char **argv)
256 {
257   CpvInitialize(CldProcInfo, CldData);
258   CpvInitialize(int, CldAskLoadHandlerIndex);
259   CpvInitialize(int, CldAckNoTaskHandlerIndex);
260   CpvInitialize(int, CldBalanceHandlerIndex);
261
262   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
263   CpvAccess(CldData)->lastCheck = -1;
264   CpvAccess(CldData)->sent = 0;
265 #if CMK_TRACE_ENABLED
266   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
267   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
268   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
269 #endif
270
271   CpvAccess(CldBalanceHandlerIndex) = 
272     CmiRegisterHandler(CldBalanceHandler);
273   CpvAccess(CldAskLoadHandlerIndex) = 
274     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
275   
276   CpvAccess(CldAckNoTaskHandlerIndex) = 
277     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
278
279   /* communication thread */
280   if (CmiMyRank() == CmiMyNodeSize())  return;
281
282   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
283
284 #if 1
285   /* register idle handlers - when idle, keep asking work from neighbors */
286   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
287       (CcdVoidFn) CldBeginIdle, NULL);
288     if (CmiMyPe() == 0) 
289       CmiPrintf("Charm++> Work stealing is enabled. \n");
290 #endif
291 }
292
293
294 void CldModuleInit(char **argv)
295 {
296   CpvInitialize(int, CldHandlerIndex);
297   CpvInitialize(int, CldRelocatedMessages);
298   CpvInitialize(int, CldLoadBalanceMessages);
299   CpvInitialize(int, CldMessageChunks);
300   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
301   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
302   CpvAccess(CldMessageChunks) = 0;
303
304   CldModuleGeneralInit(argv);
305   CldGraphModuleInit(argv);
306
307   CpvAccess(CldLoadNotify) = 1;
308 }
309
310 void CldCallback()
311 {}