Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   //double lastCheck;
20   //int    sent;                        /* flag to disable idle work request */
21   int    balanceEvt;            /* user event for balancing */
22   int    idleEvt;               /* user event for idle balancing */
23   int    idleprocEvt;           /* user event for processing idle req */
24   //double lastBalanceTime;
25 } *CldProcInfo;
26
27 int _stealonly1 = 0;
28
29 CpvStaticDeclare(CldProcInfo, CldData);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
32
33 void LoadNotifyFn(int l)
34 {
35   //CldProcInfo  cldData = CpvAccess(CldData);
36   //cldData->sent = 0;
37 }
38
39 char *CldGetStrategy(void)
40 {
41   return "work stealing";
42 }
43
44 /* since I am idle, ask for work from neighbors */
45
46 static void CldBeginIdle(void *dummy)
47 {
48   int i;
49   double startT;
50   requestmsg msg;
51   int myload;
52   CldProcInfo  cldData = CpvAccess(CldData);
53   int  victim;
54   int mype;
55   int numpes;
56
57   CcdRaiseCondition(CcdUSER);
58
59   myload = CldLoad();
60   //if (myload > 0) return; // I do not think this will be true when a processor is idle. overhead code
61
62   mype = CmiMyPe();
63   msg.from_pe = mype;
64   numpes = CmiNumPes();
65   do{
66       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
67   }while(victim == mype);
68
69   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
70 #if IDLE_IMMEDIATE
71   /* fixme */
72   CmiBecomeImmediate(&msg);
73 #endif
74   msg.to_rank = CmiRankOf(victim);
75   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
76   
77   //cldData->sent = 1;
78 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
79   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
80 #endif
81 }
82
83 /* immediate message handler, work at node level */
84 /* send some work to requested proc */
85 static void CldAskLoadHandler(requestmsg *msg)
86 {
87   int receiver, rank, recvIdx, i;
88   int myload = CldLoad();
89   CldProcInfo  cldData = CpvAccess(CldData);
90
91   int sendLoad;
92   sendLoad = myload / 2; 
93   receiver = msg->from_pe;
94   /* only give you work if I have more than 1 */
95   if (myload>LOADTHRESH) {
96       if(_stealonly1) sendLoad = 1;
97       rank = CmiMyRank();
98       if (msg->to_rank != -1) rank = msg->to_rank;
99       CldMultipleSend(receiver, sendLoad, rank, 0);
100   }else
101   {
102       requestmsg r_msg;
103       r_msg.from_pe = CmiMyPe();
104       r_msg.to_rank = CmiMyRank();
105
106       CcdRaiseCondition(CcdUSER);
107
108       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
109       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
110     /* send ack indicating there is no task */
111   }
112   CmiFree(msg);
113 }
114
115 void  CldAckNoTaskHandler(requestmsg *msg)
116 {
117   int victim; 
118   int notaskpe = msg->from_pe;
119   CldProcInfo  cldData = CpvAccess(CldData);
120   int mype = CmiMyPe();
121
122   CcdRaiseCondition(CcdUSER);
123
124   do{
125       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
126   }while(victim == mype || victim == notaskpe);
127
128   /* fixme */
129   //CmiBecomeImmediate(&msg);
130   /* reuse msg */
131   msg->to_rank = CmiRankOf(victim);
132   msg->from_pe = mype;
133   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
134   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
135   //cldData->sent = 1;
136
137   //cldData->lastCheck = CmiWallTimer();
138
139 }
140
141 void CldHandler(void *msg)
142 {
143   CldInfoFn ifn; CldPackFn pfn;
144   int len, queueing, priobits; unsigned int *prioptr;
145   
146   CldRestoreHandler(msg);
147   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
148   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
149   /*CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
150   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
151 }
152
153 void CldBalanceHandler(void *msg)
154 {
155   CldRestoreHandler(msg);
156   CldPutToken(msg);
157 }
158
159 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
160 {
161   int len, queueing, priobits,i; unsigned int *prioptr;
162   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
163   CldPackFn pfn;
164   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
165   if (pfn) {
166     pfn(&msg);
167     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
168   }
169   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
170   CmiSetInfo(msg,infofn);
171
172   CmiSyncMulticastAndFree(grp, len, msg);
173 }
174
175 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
176 {
177   int len, queueing, priobits,i; unsigned int *prioptr;
178   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
179   CldPackFn pfn;
180   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
181   if (pfn) {
182     pfn(&msg);
183     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
184   }
185   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
186   CmiSetInfo(msg,infofn);
187   CmiSyncListSendAndFree(npes, pes, len, msg);
188 }
189
190 void CldEnqueue(int pe, void *msg, int infofn)
191 {
192   int len, queueing, priobits, avg; unsigned int *prioptr;
193   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
194   CldPackFn pfn;
195
196   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
197       pe = CmiMyPe();
198     /* always pack the message because the message may be move away
199        to a different processor later by CldGetToken() */
200     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
201     if (pfn && CmiNumNodes()>1) {
202        pfn(&msg);
203        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
204     }
205     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
206     CmiSetInfo(msg,infofn);
207     CldPutToken(msg);
208   } 
209   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
210     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
211     //CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
212     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
213   }
214   else {
215     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
216     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
217       pfn(&msg);
218       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
219     }
220     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
221     CmiSetInfo(msg,infofn);
222     if (pe==CLD_BROADCAST) 
223       CmiSyncBroadcastAndFree(len, msg);
224     else if (pe==CLD_BROADCAST_ALL)
225       CmiSyncBroadcastAllAndFree(len, msg);
226     else CmiSyncSendAndFree(pe, len, msg);
227   }
228 }
229
230 void CldNodeEnqueue(int node, void *msg, int infofn)
231 {
232   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
233   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
234   CldPackFn pfn;
235   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
236       pe = CmiMyPe();
237       node = CmiNodeOf(pe);
238       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
239       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
240   }
241   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
242     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
243     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
244   } 
245   else {
246     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
247     if (pfn) {
248         pfn(&msg);
249         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
250     }
251     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
252     CmiSetInfo(msg,infofn);
253     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
254     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
255     else CmiSyncNodeSendAndFree(node, len, msg);
256   }
257 }
258
259
260 void CldGraphModuleInit(char **argv)
261 {
262   CpvInitialize(CldProcInfo, CldData);
263   CpvInitialize(int, CldAskLoadHandlerIndex);
264   CpvInitialize(int, CldAckNoTaskHandlerIndex);
265   CpvInitialize(int, CldBalanceHandlerIndex);
266
267   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
268   //CpvAccess(CldData)->lastCheck = -1;
269   //CpvAccess(CldData)->sent = 0;
270 #if CMK_TRACE_ENABLED
271   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
272   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
273   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
274 #endif
275
276   CpvAccess(CldBalanceHandlerIndex) = 
277     CmiRegisterHandler(CldBalanceHandler);
278   CpvAccess(CldAskLoadHandlerIndex) = 
279     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
280   
281   CpvAccess(CldAckNoTaskHandlerIndex) = 
282     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
283
284   /* communication thread */
285   if (CmiMyRank() == CmiMyNodeSize())  return;
286
287   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
288
289 #if 1
290   /* register idle handlers - when idle, keep asking work from neighbors */
291   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
292       (CcdVoidFn) CldBeginIdle, NULL);
293     if (CmiMyPe() == 0) 
294       CmiPrintf("Charm++> Work stealing is enabled. \n");
295 #endif
296 }
297
298
299 void CldModuleInit(char **argv)
300 {
301   CpvInitialize(int, CldHandlerIndex);
302   CpvInitialize(int, CldRelocatedMessages);
303   CpvInitialize(int, CldLoadBalanceMessages);
304   CpvInitialize(int, CldMessageChunks);
305   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
306   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
307   CpvAccess(CldMessageChunks) = 0;
308
309   CldModuleGeneralInit(argv);
310   CldGraphModuleInit(argv);
311
312   CpvAccess(CldLoadNotify) = 1;
313 }
314
315 void CldCallback()
316 {}