Merge branch 'charm' of /expand/home/gioachin/charm into charm
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   double lastCheck;
20   int    sent;                  /* flag to disable idle work request */
21   int    balanceEvt;            /* user event for balancing */
22   int    idleEvt;               /* user event for idle balancing */
23   int    idleprocEvt;           /* user event for processing idle req */
24   double lastBalanceTime;
25 } *CldProcInfo;
26
27 int _stealonly1 = 0;
28
29 CpvStaticDeclare(CldProcInfo, CldData);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
32
33 void LoadNotifyFn(int l)
34 {
35   CldProcInfo  cldData = CpvAccess(CldData);
36   cldData->sent = 0;
37 }
38
39 char *CldGetStrategy(void)
40 {
41   return "work stealing";
42 }
43
44 /* since I am idle, ask for work from neighbors */
45 static void CldBeginIdle(void *dummy)
46 {
47   CpvAccess(CldData)->lastCheck = CmiWallTimer();
48 }
49
50 static void CldEndIdle(void *dummy)
51 {
52   CpvAccess(CldData)->lastCheck = -1;
53 }
54
55 static void CldStillIdle(void *dummy, double curT)
56 {
57   int i;
58   double startT;
59   requestmsg msg;
60   int myload;
61   CldProcInfo  cldData = CpvAccess(CldData);
62   int  victim;
63   int mype;
64
65   double now = curT;
66   double lt = cldData->lastCheck;
67   /* only ask for work every 20ms */
68   if (cldData->sent && (lt!=-1 && now-lt< PERIOD*0.001)) return;
69   cldData->lastCheck = now;
70
71   myload = CldLoad();
72   if (myload > 0) return;
73
74   msg.from_pe = CmiMyPe();
75   mype = CmiMyPe();
76
77   do{
78       victim = (((CrnRand()+mype)&0x7FFFFFFF)%CmiNumPes());
79   }while(victim == mype);
80
81   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
82   /* fixme */
83   //CmiBecomeImmediate(&msg);
84   msg.to_rank = CmiRankOf(victim);
85   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
86   cldData->sent = 1;
87
88 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
89   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
90 #endif
91 }
92
93 /* immediate message handler, work at node level */
94 /* send some work to requested proc */
95 static void CldAskLoadHandler(requestmsg *msg)
96 {
97   int receiver, rank, recvIdx, i;
98   int myload = CldLoad();
99   double now = CmiWallTimer();
100   CldProcInfo  cldData = CpvAccess(CldData);
101
102   int sendLoad;
103   sendLoad = myload / 2; 
104   receiver = msg->from_pe;
105   /* only give you work if I have more than 1 */
106   if (sendLoad>0) {
107       if(_stealonly1) sendLoad = 1;
108       rank = CmiMyRank();
109       if (msg->to_rank != -1) rank = msg->to_rank;
110       CldMultipleSend(receiver, sendLoad, rank, 0);
111   }else
112   {
113       requestmsg r_msg;
114       r_msg.from_pe = CmiMyPe();
115       r_msg.to_rank = CmiMyRank();
116
117       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
118       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
119     /* send ack indicating there is no task */
120   }
121   CmiFree(msg);
122 }
123
124 void  CldAckNoTaskHandler(requestmsg *msg)
125 {
126     int victim; 
127     requestmsg r_msg;
128     int notaskpe = msg->from_pe;
129     CldProcInfo  cldData = CpvAccess(CldData);
130     int mype = CmiMyPe();
131   do{
132       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
133   }while(victim == mype || victim == notaskpe);
134
135   /* fixme */
136   //CmiBecomeImmediate(&msg);
137   r_msg.to_rank = CmiRankOf(victim);
138   r_msg.from_pe = mype;
139   CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
140   CmiSyncSend(victim, sizeof(requestmsg),(char *)&r_msg);
141   cldData->sent = 1;
142
143   cldData->lastCheck = CmiWallTimer();
144
145   CmiFree(msg);
146
147 }
148 void CldHandler(void *msg)
149 {
150   CldInfoFn ifn; CldPackFn pfn;
151   int len, queueing, priobits; unsigned int *prioptr;
152   
153   CldRestoreHandler(msg);
154   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
155   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
156   /*CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
157   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
158 }
159
160 void CldBalanceHandler(void *msg)
161 {
162   CldRestoreHandler(msg);
163   CldPutToken(msg);
164 }
165
166 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
167 {
168   int len, queueing, priobits,i; unsigned int *prioptr;
169   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
170   CldPackFn pfn;
171   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
172   if (pfn) {
173     pfn(&msg);
174     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
175   }
176   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
177   CmiSetInfo(msg,infofn);
178
179   CmiSyncMulticastAndFree(grp, len, msg);
180 }
181
182 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
183 {
184   int len, queueing, priobits,i; unsigned int *prioptr;
185   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
186   CldPackFn pfn;
187   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
188   if (pfn) {
189     pfn(&msg);
190     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
191   }
192   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
193   CmiSetInfo(msg,infofn);
194   CmiSyncListSendAndFree(npes, pes, len, msg);
195 }
196
197 void CldEnqueue(int pe, void *msg, int infofn)
198 {
199   int len, queueing, priobits, avg; unsigned int *prioptr;
200   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
201   CldPackFn pfn;
202
203   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
204       pe = CmiMyPe();
205     /* always pack the message because the message may be move away
206        to a different processor later by CldGetToken() */
207     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
208     if (pfn && CmiNumNodes()>1) {
209        pfn(&msg);
210        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
211     }
212     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
213     CmiSetInfo(msg,infofn);
214     CldPutToken(msg);
215   } 
216   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
217     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
218     //CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
219     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
220   }
221   else {
222     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
223     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
224       pfn(&msg);
225       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
226     }
227     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
228     CmiSetInfo(msg,infofn);
229     if (pe==CLD_BROADCAST) 
230       CmiSyncBroadcastAndFree(len, msg);
231     else if (pe==CLD_BROADCAST_ALL)
232       CmiSyncBroadcastAllAndFree(len, msg);
233     else CmiSyncSendAndFree(pe, len, msg);
234   }
235 }
236
237 void CldNodeEnqueue(int node, void *msg, int infofn)
238 {
239   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
240   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
241   CldPackFn pfn;
242   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
243       pe = CmiMyPe();
244       node = CmiNodeOf(pe);
245       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
246       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
247   }
248   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
249     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
250     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
251   } 
252   else {
253     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
254     if (pfn) {
255         pfn(&msg);
256         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
257     }
258     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
259     CmiSetInfo(msg,infofn);
260     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
261     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
262     else CmiSyncNodeSendAndFree(node, len, msg);
263   }
264 }
265
266
267 void CldGraphModuleInit(char **argv)
268 {
269   CpvInitialize(CldProcInfo, CldData);
270   CpvInitialize(int, CldAskLoadHandlerIndex);
271   CpvInitialize(int, CldAckNoTaskHandlerIndex);
272   CpvInitialize(int, CldBalanceHandlerIndex);
273
274   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
275   CpvAccess(CldData)->lastCheck = -1;
276   CpvAccess(CldData)->sent = 0;
277 #if CMK_TRACE_ENABLED
278   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
279   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
280   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
281 #endif
282
283   CpvAccess(CldBalanceHandlerIndex) = 
284     CmiRegisterHandler(CldBalanceHandler);
285   CpvAccess(CldAskLoadHandlerIndex) = 
286     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
287   
288   CpvAccess(CldAckNoTaskHandlerIndex) = 
289     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
290
291   /* communication thread */
292   if (CmiMyRank() == CmiMyNodeSize())  return;
293
294   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
295
296 #if 1
297   /* register idle handlers - when idle, keep asking work from neighbors */
298   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
299       (CcdVoidFn) CldStillIdle, NULL);
300   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
301       (CcdVoidFn) CldStillIdle, NULL);
302     if (CmiMyPe() == 0) 
303       CmiPrintf("Charm++> Work stealing is enabled. \n");
304 #endif
305 }
306
307
308 void CldModuleInit(char **argv)
309 {
310   CpvInitialize(int, CldHandlerIndex);
311   CpvInitialize(int, CldRelocatedMessages);
312   CpvInitialize(int, CldLoadBalanceMessages);
313   CpvInitialize(int, CldMessageChunks);
314   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
315   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
316   CpvAccess(CldMessageChunks) = 0;
317
318   CldModuleGeneralInit(argv);
319   CldGraphModuleInit(argv);
320
321   CpvAccess(CldLoadNotify) = 1;
322 }
323
324 void CldCallback()
325 {}