Add a workstealing seed balancer
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 20                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   double lastCheck;
20   int    sent;                  /* flag to disable idle work request */
21   int    balanceEvt;            /* user event for balancing */
22   int    idleEvt;               /* user event for idle balancing */
23   int    idleprocEvt;           /* user event for processing idle req */
24   double lastBalanceTime;
25 } *CldProcInfo;
26
27
28 CpvStaticDeclare(CldProcInfo, CldData);
29 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
30
31 void LoadNotifyFn(int l)
32 {
33   CldProcInfo  cldData = CpvAccess(CldData);
34   cldData->sent = 0;
35 }
36
37 char *CldGetStrategy(void)
38 {
39   return "work stealing";
40 }
41
42 /* since I am idle, ask for work from neighbors */
43 static void CldBeginIdle(void *dummy)
44 {
45   CpvAccess(CldData)->lastCheck = CmiWallTimer();
46 }
47
48 static void CldEndIdle(void *dummy)
49 {
50   CpvAccess(CldData)->lastCheck = -1;
51 }
52
53 static void CldStillIdle(void *dummy, double curT)
54 {
55   int i;
56   double startT;
57   requestmsg msg;
58   int myload;
59   CldProcInfo  cldData = CpvAccess(CldData);
60   int  victim;
61
62   double now = curT;
63   double lt = cldData->lastCheck;
64   /* only ask for work every 20ms */
65   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
66   cldData->lastCheck = now;
67
68   myload = CldLoad();
69   if (myload > 0) return;
70
71   msg.from_pe = CmiMyPe();
72
73   victim = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
74
75   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
76   /* fixme */
77   //CmiBecomeImmediate(&msg);
78   msg.to_rank = CmiRankOf(victim);
79   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
80   cldData->sent = 1;
81
82 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
83   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
84 #endif
85 }
86
87 /* immediate message handler, work at node level */
88 /* send some work to requested proc */
89 static void CldAskLoadHandler(requestmsg *msg)
90 {
91   int receiver, rank, recvIdx, i;
92   int myload = CldLoad();
93   double now = CmiWallTimer();
94   CldProcInfo  cldData = CpvAccess(CldData);
95
96   /* only give you work if I have more than 1 */
97   if (myload>0) {
98     int sendLoad;
99     receiver = msg->from_pe;
100     rank = CmiMyRank();
101     if (msg->to_rank != -1) rank = msg->to_rank;
102     sendLoad = myload / 2; 
103     CmiFree(msg);
104     if (sendLoad < 1) return;
105     CldMultipleSend(receiver, sendLoad, rank, 0);
106   }
107 }
108
109 void CldHandler(void *msg)
110 {
111   CldInfoFn ifn; CldPackFn pfn;
112   int len, queueing, priobits; unsigned int *prioptr;
113   
114   CldRestoreHandler(msg);
115   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
116   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
117   /*CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
118   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
119 }
120
121 void CldBalanceHandler(void *msg)
122 {
123   CldRestoreHandler(msg);
124   CldPutToken(msg);
125 }
126
127 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
128 {
129   int len, queueing, priobits,i; unsigned int *prioptr;
130   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
131   CldPackFn pfn;
132   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
133   if (pfn) {
134     pfn(&msg);
135     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
136   }
137   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
138   CmiSetInfo(msg,infofn);
139
140   CmiSyncMulticastAndFree(grp, len, msg);
141 }
142
143 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
144 {
145   int len, queueing, priobits,i; unsigned int *prioptr;
146   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
147   CldPackFn pfn;
148   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
149   if (pfn) {
150     pfn(&msg);
151     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
152   }
153   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
154   CmiSetInfo(msg,infofn);
155   CmiSyncListSendAndFree(npes, pes, len, msg);
156 }
157
158 void CldEnqueue(int pe, void *msg, int infofn)
159 {
160   int len, queueing, priobits, avg; unsigned int *prioptr;
161   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
162   CldPackFn pfn;
163
164   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
165       pe = CmiMyPe();
166     /* always pack the message because the message may be move away
167        to a different processor later by CldGetToken() */
168     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
169     if (pfn && CmiNumNodes()>1) {
170        pfn(&msg);
171        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
172     }
173     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
174     CmiSetInfo(msg,infofn);
175     CldPutToken(msg);
176   } 
177   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
178     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
179     //CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
180     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
181   }
182   else {
183     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
184     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
185       pfn(&msg);
186       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
187     }
188     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
189     CmiSetInfo(msg,infofn);
190     if (pe==CLD_BROADCAST) 
191       CmiSyncBroadcastAndFree(len, msg);
192     else if (pe==CLD_BROADCAST_ALL)
193       CmiSyncBroadcastAllAndFree(len, msg);
194     else CmiSyncSendAndFree(pe, len, msg);
195   }
196 }
197
198 void CldNodeEnqueue(int node, void *msg, int infofn)
199 {
200   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
201   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
202   CldPackFn pfn;
203   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
204       pe = CmiMyPe();
205       node = CmiNodeOf(pe);
206       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
207       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
208   }
209   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
210     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
211     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
212   } 
213   else {
214     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
215     if (pfn) {
216         pfn(&msg);
217         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
218     }
219     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
220     CmiSetInfo(msg,infofn);
221     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
222     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
223     else CmiSyncNodeSendAndFree(node, len, msg);
224   }
225 }
226
227
228 void CldGraphModuleInit(char **argv)
229 {
230   CpvInitialize(CldProcInfo, CldData);
231   CpvInitialize(int, CldLoadResponseHandlerIndex);
232   CpvInitialize(int, CldAskLoadHandlerIndex);
233   CpvInitialize(int, CldBalanceHandlerIndex);
234
235   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
236   CpvAccess(CldData)->lastCheck = -1;
237   CpvAccess(CldData)->sent = 0;
238 #if CMK_TRACE_ENABLED
239   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
240   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
241   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
242 #endif
243
244   CpvAccess(CldBalanceHandlerIndex) = 
245     CmiRegisterHandler(CldBalanceHandler);
246   CpvAccess(CldAskLoadHandlerIndex) = 
247     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
248
249   /* communication thread */
250   if (CmiMyRank() == CmiMyNodeSize())  return;
251
252 #if 1
253   /* register idle handlers - when idle, keep asking work from neighbors */
254   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
255       (CcdVoidFn) CldStillIdle, NULL);
256   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
257       (CcdVoidFn) CldStillIdle, NULL);
258     if (CmiMyPe() == 0) 
259       CmiPrintf("Charm++> Work stealing is enabled. \n");
260 #endif
261 }
262
263
264 void CldModuleInit(char **argv)
265 {
266   CpvInitialize(int, CldHandlerIndex);
267   CpvInitialize(int, CldRelocatedMessages);
268   CpvInitialize(int, CldLoadBalanceMessages);
269   CpvInitialize(int, CldMessageChunks);
270   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
271   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
272   CpvAccess(CldMessageChunks) = 0;
273
274   CldModuleGeneralInit(argv);
275   CldGraphModuleInit(argv);
276
277   CpvAccess(CldLoadNotify) = 1;
278 }
279
280 void CldCallback()
281 {}