Merge branch 'charm' of charmgit:charm into bhatele/ldbcputimer
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25
26 CpvStaticDeclare(CldProcInfo, CldData);
27 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
28 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
29 CpvStaticDeclare(int, isStealing);
30
31 void LoadNotifyFn(int l)
32 {
33 }
34
35 char *CldGetStrategy(void)
36 {
37   return "work stealing";
38 }
39
40 /* since I am idle, ask for work from neighbors */
41
42 static void CldBeginIdle(void *dummy)
43 {
44   int i;
45   double startT;
46   requestmsg msg;
47   int myload;
48   int  victim;
49   int mype;
50   int numpes;
51
52   /* CcdRaiseCondition(CcdUSER); */
53
54   if (CpvAccess(isStealing)) return;    /* already stealing, return */
55   CpvAccess(isStealing) = 1;
56
57   myload = CldLoad();
58
59   mype = CmiMyPe();
60   msg.from_pe = mype;
61   numpes = CmiNumPes();
62   do{
63       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
64   }while(victim == mype);
65
66   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
67 #if IDLE_IMMEDIATE
68   /* fixme */
69   CmiBecomeImmediate(&msg);
70 #endif
71   msg.to_rank = CmiRankOf(victim);
72   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
73   
74 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
75   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
76 #endif
77 }
78
79 /* immediate message handler, work at node level */
80 /* send some work to requested proc */
81 static void CldAskLoadHandler(requestmsg *msg)
82 {
83   int receiver, rank, recvIdx, i;
84   int myload = CldLoad();
85
86   int sendLoad;
87   sendLoad = myload / 2; 
88   receiver = msg->from_pe;
89   /* only give you work if I have more than 1 */
90   if (myload>LOADTHRESH) {
91       if(_stealonly1) sendLoad = 1;
92       rank = CmiMyRank();
93       if (msg->to_rank != -1) rank = msg->to_rank;
94       CldMultipleSend(receiver, sendLoad, rank, 0);
95   }else
96   {
97       msg->from_pe = CmiMyPe();
98       msg->to_rank = CmiMyRank();
99
100       /* CcdRaiseCondition(CcdUSER); */
101
102       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
103       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
104     /* send ack indicating there is no task */
105   }
106 }
107
108 void  CldAckNoTaskHandler(requestmsg *msg)
109 {
110   int victim; 
111   int notaskpe = msg->from_pe;
112   int mype = CmiMyPe();
113
114   /* CcdRaiseCondition(CcdUSER); */
115
116   do{
117       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
118   }while(victim == mype);
119
120   /* reuse msg */
121   msg->to_rank = CmiRankOf(victim);
122   msg->from_pe = mype;
123   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
124   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
125
126   CpvAccess(isStealing) = 1;
127 }
128
129 void CldHandler(void *msg)
130 {
131   CldInfoFn ifn; CldPackFn pfn;
132   int len, queueing, priobits; unsigned int *prioptr;
133   
134   CldRestoreHandler(msg);
135   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
136   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
137   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
138 }
139
140 void CldBalanceHandler(void *msg)
141 {
142   CldRestoreHandler(msg);
143   CldPutToken(msg);
144   CpvAccess(isStealing) = 0;
145 }
146
147 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
148 {
149   int len, queueing, priobits,i; unsigned int *prioptr;
150   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
151   CldPackFn pfn;
152   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
153   if (pfn) {
154     pfn(&msg);
155     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
156   }
157   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
158   CmiSetInfo(msg,infofn);
159
160   CmiSyncMulticastAndFree(grp, len, msg);
161 }
162
163 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
164 {
165   int len, queueing, priobits,i; unsigned int *prioptr;
166   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
167   CldPackFn pfn;
168   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
169   if (pfn) {
170     pfn(&msg);
171     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
172   }
173   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
174   CmiSetInfo(msg,infofn);
175   CmiSyncListSendAndFree(npes, pes, len, msg);
176 }
177
178 void CldEnqueue(int pe, void *msg, int infofn)
179 {
180   int len, queueing, priobits, avg; unsigned int *prioptr;
181   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
182   CldPackFn pfn;
183
184   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
185       pe = CmiMyPe();
186     /* always pack the message because the message may be move away
187        to a different processor later by CldGetToken() */
188     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
189     if (pfn && CmiNumNodes()>1) {
190        pfn(&msg);
191        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
192     }
193     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
194     CmiSetInfo(msg,infofn);
195     CldPutToken(msg);
196   } 
197   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
198     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
199     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
200   }
201   else {
202     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
203     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
204       pfn(&msg);
205       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
206     }
207     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
208     CmiSetInfo(msg,infofn);
209     if (pe==CLD_BROADCAST) 
210       CmiSyncBroadcastAndFree(len, msg);
211     else if (pe==CLD_BROADCAST_ALL)
212       CmiSyncBroadcastAllAndFree(len, msg);
213     else CmiSyncSendAndFree(pe, len, msg);
214   }
215 }
216
217 void CldNodeEnqueue(int node, void *msg, int infofn)
218 {
219   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
220   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
221   CldPackFn pfn;
222   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
223       pe = CmiMyPe();
224       node = CmiNodeOf(pe);
225       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
226       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
227   }
228   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
229     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
230     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
231   } 
232   else {
233     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
234     if (pfn) {
235         pfn(&msg);
236         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
237     }
238     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
239     CmiSetInfo(msg,infofn);
240     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
241     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
242     else CmiSyncNodeSendAndFree(node, len, msg);
243   }
244 }
245
246
247 void CldGraphModuleInit(char **argv)
248 {
249   CpvInitialize(CldProcInfo, CldData);
250   CpvInitialize(int, CldAskLoadHandlerIndex);
251   CpvInitialize(int, CldAckNoTaskHandlerIndex);
252   CpvInitialize(int, CldBalanceHandlerIndex);
253
254   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
255 #if CMK_TRACE_ENABLED
256   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
257   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
258   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
259 #endif
260
261   CpvAccess(CldBalanceHandlerIndex) = 
262     CmiRegisterHandler(CldBalanceHandler);
263   CpvAccess(CldAskLoadHandlerIndex) = 
264     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
265   
266   CpvAccess(CldAckNoTaskHandlerIndex) = 
267     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
268
269   /* communication thread */
270   if (CmiMyRank() == CmiMyNodeSize())  return;
271
272   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
273
274   /* register idle handlers - when idle, keep asking work from neighbors */
275   if(CmiNumPes() > 1)
276   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
277       (CcdVoidFn) CldBeginIdle, NULL);
278   if (CmiMyPe() == 0) 
279       CmiPrintf("Charm++> Work stealing is enabled. \n");
280 }
281
282
283 void CldModuleInit(char **argv)
284 {
285   CpvInitialize(int, CldHandlerIndex);
286   CpvInitialize(int, CldRelocatedMessages);
287   CpvInitialize(int, CldLoadBalanceMessages);
288   CpvInitialize(int, CldMessageChunks);
289   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
290   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
291   CpvAccess(CldMessageChunks) = 0;
292
293   CldModuleGeneralInit(argv);
294   CldGraphModuleInit(argv);
295
296   CpvAccess(CldLoadNotify) = 1;
297
298   CpvInitialize(int, isStealing);
299   CpvAccess(isStealing) = 0;
300 }
301
302 void CldCallback()
303 {}