Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25
26 CpvStaticDeclare(CldProcInfo, CldData);
27 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
28 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
29 CpvStaticDeclare(int, isStealing);
30
31 void LoadNotifyFn(int l)
32 {
33 }
34
35 char *CldGetStrategy(void)
36 {
37   return "work stealing";
38 }
39
40 /* since I am idle, ask for work from neighbors */
41
42 static void CldBeginIdle(void *dummy)
43 {
44   int i;
45   double startT;
46   requestmsg msg;
47   int myload;
48   int  victim;
49   int mype;
50   int numpes;
51
52   CcdRaiseCondition(CcdUSER);
53
54   if (CpvAccess(isStealing)) return;    /* already stealing, return */
55   CpvAccess(isStealing) = 1;
56
57   myload = CldLoad();
58
59   mype = CmiMyPe();
60   msg.from_pe = mype;
61   numpes = CmiNumPes();
62   do{
63       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
64   }while(victim == mype);
65
66   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
67 #if IDLE_IMMEDIATE
68   /* fixme */
69   CmiBecomeImmediate(&msg);
70 #endif
71   msg.to_rank = CmiRankOf(victim);
72   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
73   
74 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
75   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
76 #endif
77 }
78
79 /* immediate message handler, work at node level */
80 /* send some work to requested proc */
81 static void CldAskLoadHandler(requestmsg *msg)
82 {
83   int receiver, rank, recvIdx, i;
84   int myload = CldLoad();
85
86   int sendLoad;
87   sendLoad = myload / 2; 
88   receiver = msg->from_pe;
89   /* only give you work if I have more than 1 */
90   if (myload>LOADTHRESH) {
91       if(_stealonly1) sendLoad = 1;
92       rank = CmiMyRank();
93       if (msg->to_rank != -1) rank = msg->to_rank;
94       CldMultipleSend(receiver, sendLoad, rank, 0);
95   }else
96   {
97       requestmsg r_msg;
98       r_msg.from_pe = CmiMyPe();
99       r_msg.to_rank = CmiMyRank();
100
101       CcdRaiseCondition(CcdUSER);
102
103       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
104       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
105     /* send ack indicating there is no task */
106   }
107   CmiFree(msg);
108 }
109
110 void  CldAckNoTaskHandler(requestmsg *msg)
111 {
112   int victim; 
113   int notaskpe = msg->from_pe;
114   int mype = CmiMyPe();
115
116   CcdRaiseCondition(CcdUSER);
117
118   do{
119       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
120   }while(victim == mype);
121
122   /* reuse msg */
123   msg->to_rank = CmiRankOf(victim);
124   msg->from_pe = mype;
125   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
126   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
127
128   CpvAccess(isStealing) = 1;
129 }
130
131 void CldHandler(void *msg)
132 {
133   CldInfoFn ifn; CldPackFn pfn;
134   int len, queueing, priobits; unsigned int *prioptr;
135   
136   CldRestoreHandler(msg);
137   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
138   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
139   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
140 }
141
142 void CldBalanceHandler(void *msg)
143 {
144   CldRestoreHandler(msg);
145   CldPutToken(msg);
146   CpvAccess(isStealing) = 0;
147 }
148
149 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
150 {
151   int len, queueing, priobits,i; unsigned int *prioptr;
152   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
153   CldPackFn pfn;
154   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
155   if (pfn) {
156     pfn(&msg);
157     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
158   }
159   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
160   CmiSetInfo(msg,infofn);
161
162   CmiSyncMulticastAndFree(grp, len, msg);
163 }
164
165 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
166 {
167   int len, queueing, priobits,i; unsigned int *prioptr;
168   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
169   CldPackFn pfn;
170   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
171   if (pfn) {
172     pfn(&msg);
173     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
174   }
175   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
176   CmiSetInfo(msg,infofn);
177   CmiSyncListSendAndFree(npes, pes, len, msg);
178 }
179
180 void CldEnqueue(int pe, void *msg, int infofn)
181 {
182   int len, queueing, priobits, avg; unsigned int *prioptr;
183   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
184   CldPackFn pfn;
185
186   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
187       pe = CmiMyPe();
188     /* always pack the message because the message may be move away
189        to a different processor later by CldGetToken() */
190     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
191     if (pfn && CmiNumNodes()>1) {
192        pfn(&msg);
193        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
194     }
195     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
196     CmiSetInfo(msg,infofn);
197     CldPutToken(msg);
198   } 
199   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
200     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
201     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
202   }
203   else {
204     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
205     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
206       pfn(&msg);
207       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
208     }
209     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
210     CmiSetInfo(msg,infofn);
211     if (pe==CLD_BROADCAST) 
212       CmiSyncBroadcastAndFree(len, msg);
213     else if (pe==CLD_BROADCAST_ALL)
214       CmiSyncBroadcastAllAndFree(len, msg);
215     else CmiSyncSendAndFree(pe, len, msg);
216   }
217 }
218
219 void CldNodeEnqueue(int node, void *msg, int infofn)
220 {
221   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
222   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
223   CldPackFn pfn;
224   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
225       pe = CmiMyPe();
226       node = CmiNodeOf(pe);
227       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
228       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
229   }
230   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
231     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
232     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
233   } 
234   else {
235     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
236     if (pfn) {
237         pfn(&msg);
238         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
239     }
240     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
241     CmiSetInfo(msg,infofn);
242     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
243     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
244     else CmiSyncNodeSendAndFree(node, len, msg);
245   }
246 }
247
248
249 void CldGraphModuleInit(char **argv)
250 {
251   CpvInitialize(CldProcInfo, CldData);
252   CpvInitialize(int, CldAskLoadHandlerIndex);
253   CpvInitialize(int, CldAckNoTaskHandlerIndex);
254   CpvInitialize(int, CldBalanceHandlerIndex);
255
256   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
257 #if CMK_TRACE_ENABLED
258   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
259   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
260   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
261 #endif
262
263   CpvAccess(CldBalanceHandlerIndex) = 
264     CmiRegisterHandler(CldBalanceHandler);
265   CpvAccess(CldAskLoadHandlerIndex) = 
266     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
267   
268   CpvAccess(CldAckNoTaskHandlerIndex) = 
269     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
270
271   /* communication thread */
272   if (CmiMyRank() == CmiMyNodeSize())  return;
273
274   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
275
276   /* register idle handlers - when idle, keep asking work from neighbors */
277   if(CmiNumPes() > 1)
278   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
279       (CcdVoidFn) CldBeginIdle, NULL);
280   if (CmiMyPe() == 0) 
281       CmiPrintf("Charm++> Work stealing is enabled. \n");
282 }
283
284
285 void CldModuleInit(char **argv)
286 {
287   CpvInitialize(int, CldHandlerIndex);
288   CpvInitialize(int, CldRelocatedMessages);
289   CpvInitialize(int, CldLoadBalanceMessages);
290   CpvInitialize(int, CldMessageChunks);
291   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
292   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
293   CpvAccess(CldMessageChunks) = 0;
294
295   CldModuleGeneralInit(argv);
296   CldGraphModuleInit(argv);
297
298   CpvAccess(CldLoadNotify) = 1;
299
300   CpvInitialize(int, isStealing);
301   CpvAccess(isStealing) = 0;
302 }
303
304 void CldCallback()
305 {}