minor change to make charm output uniform.
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25 int workstealingproactive = 0;
26
27 CpvStaticDeclare(CldProcInfo, CldData);
28 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
29 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
30 CpvStaticDeclare(int, isStealing);
31
32
33 char *CldGetStrategy(void)
34 {
35   return "work stealing";
36 }
37
38
39 static void StealLoad()
40 {
41   int i;
42   double startT;
43   requestmsg msg;
44   int myload;
45   int  victim;
46   int mype;
47   int numpes;
48
49   /* CcdRaiseCondition(CcdUSER); */
50
51   if (CpvAccess(isStealing)) return;    /* already stealing, return */
52
53   //myload = CldLoad();
54   myload = CldCountTokens();
55   if(myload>0) return;
56
57   CpvAccess(isStealing) = 1;
58
59   mype = CmiMyPe();
60   msg.from_pe = mype;
61   numpes = CmiNumPes();
62   do{
63       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
64   }while(victim == mype);
65
66   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
67 #if IDLE_IMMEDIATE
68   /* fixme */
69   CmiBecomeImmediate(&msg);
70 #endif
71   msg.to_rank = CmiRankOf(victim);
72   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
73   
74 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
75   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
76 #endif
77 }
78
79 void LoadNotifyFn(int l)
80 {
81     if(workstealingproactive)
82     {
83         if(CldCountTokens() < 3)
84             StealLoad();
85     }
86 }
87 /* since I am idle, ask for work from neighbors */
88
89 static void CldBeginIdle(void *dummy)
90 {
91     StealLoad();
92 }
93
94 /* immediate message handler, work at node level */
95 /* send some work to requested proc */
96 static void CldAskLoadHandler(requestmsg *msg)
97 {
98   int receiver, rank, recvIdx, i;
99   //int myload = CldLoad();
100   int myload = CldCountTokens();
101
102   int sendLoad;
103   sendLoad = myload / 2; 
104   receiver = msg->from_pe;
105   /* only give you work if I have more than 1 */
106   if (myload>LOADTHRESH) {
107       if(_stealonly1) sendLoad = 1;
108       rank = CmiMyRank();
109       if (msg->to_rank != -1) rank = msg->to_rank;
110       CldMultipleSend(receiver, sendLoad, rank, 0);
111   }else
112   {
113       msg->from_pe = CmiMyPe();
114       msg->to_rank = CmiMyRank();
115
116       /* CcdRaiseCondition(CcdUSER); */
117
118       CmiSetHandler(msg, CpvAccess(CldAckNoTaskHandlerIndex));
119       CmiSyncSendAndFree(receiver, sizeof(requestmsg),(char *)msg);
120     /* send ack indicating there is no task */
121   }
122 }
123
124 void  CldAckNoTaskHandler(requestmsg *msg)
125 {
126   int victim; 
127   int notaskpe = msg->from_pe;
128   int mype = CmiMyPe();
129   int numpes = CmiNumPes();
130
131   /* CcdRaiseCondition(CcdUSER); */
132
133   do{
134       /*victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());*/
135       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
136   }while(victim == mype);
137
138   /* reuse msg */
139   msg->to_rank = CmiRankOf(victim);
140   msg->from_pe = mype;
141   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
142   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
143
144   CpvAccess(isStealing) = 1;
145 }
146
147 void CldHandler(void *msg)
148 {
149   CldInfoFn ifn; CldPackFn pfn;
150   int len, queueing, priobits; unsigned int *prioptr;
151   
152   CldRestoreHandler(msg);
153   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
154   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
155   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
156 }
157
158 void CldBalanceHandler(void *msg)
159 {
160   CldRestoreHandler(msg);
161   CldPutToken(msg);
162   CpvAccess(isStealing) = 0;
163 }
164
165 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
166 {
167   int len, queueing, priobits,i; unsigned int *prioptr;
168   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
169   CldPackFn pfn;
170   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
171   if (pfn) {
172     pfn(&msg);
173     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
174   }
175   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
176   CmiSetInfo(msg,infofn);
177
178   CmiSyncMulticastAndFree(grp, len, msg);
179 }
180
181 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
182 {
183   int len, queueing, priobits,i; unsigned int *prioptr;
184   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
185   CldPackFn pfn;
186   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
187   if (pfn) {
188     pfn(&msg);
189     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
190   }
191   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
192   CmiSetInfo(msg,infofn);
193   CmiSyncListSendAndFree(npes, pes, len, msg);
194 }
195
196 void CldEnqueue(int pe, void *msg, int infofn)
197 {
198   int len, queueing, priobits, avg; unsigned int *prioptr;
199   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
200   CldPackFn pfn;
201
202   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
203       pe = CmiMyPe();
204     /* always pack the message because the message may be move away
205        to a different processor later by CldGetToken() */
206     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
207     if (pfn && CmiNumNodes()>1) {
208        pfn(&msg);
209        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
210     }
211     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
212     CmiSetInfo(msg,infofn);
213     CldPutToken(msg);
214   } 
215   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
216     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
217     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
218   }
219   else {
220     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
221     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
222       pfn(&msg);
223       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
224     }
225     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
226     CmiSetInfo(msg,infofn);
227     if (pe==CLD_BROADCAST) 
228       CmiSyncBroadcastAndFree(len, msg);
229     else if (pe==CLD_BROADCAST_ALL)
230       CmiSyncBroadcastAllAndFree(len, msg);
231     else CmiSyncSendAndFree(pe, len, msg);
232   }
233 }
234
235 void CldNodeEnqueue(int node, void *msg, int infofn)
236 {
237   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
238   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
239   CldPackFn pfn;
240   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
241       pe = CmiMyPe();
242       node = CmiNodeOf(pe);
243       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
244       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
245   }
246   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
247     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
248     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
249   } 
250   else {
251     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
252     if (pfn) {
253         pfn(&msg);
254         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
255     }
256     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
257     CmiSetInfo(msg,infofn);
258     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
259     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
260     else CmiSyncNodeSendAndFree(node, len, msg);
261   }
262 }
263
264
265 void CldGraphModuleInit(char **argv)
266 {
267   CpvInitialize(CldProcInfo, CldData);
268   CpvInitialize(int, CldAskLoadHandlerIndex);
269   CpvInitialize(int, CldAckNoTaskHandlerIndex);
270   CpvInitialize(int, CldBalanceHandlerIndex);
271
272   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
273 #if CMK_TRACE_ENABLED
274   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
275   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
276   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
277 #endif
278
279   CpvAccess(CldBalanceHandlerIndex) = 
280     CmiRegisterHandler(CldBalanceHandler);
281   CpvAccess(CldAskLoadHandlerIndex) = 
282     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
283   
284   CpvAccess(CldAckNoTaskHandlerIndex) = 
285     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
286
287   /* communication thread */
288   if (CmiMyRank() == CmiMyNodeSize())  return;
289
290   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
291   
292   workstealingproactive= CmiGetArgFlagDesc(argv, "+workstealingproactive", "Charm++> Work Stealing, steal before going idle(threshold = 3)");
293
294   /* register idle handlers - when idle, keep asking work from neighbors */
295   if(CmiNumPes() > 1)
296     CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
297       (CcdVoidFn) CldBeginIdle, NULL);
298   if(workstealingproactive && CmiMyPe() == 0)
299       CmiPrintf("Charm++> Steal work when load is fewer than 3. \n");
300 }
301
302
303 void CldModuleInit(char **argv)
304 {
305   CpvInitialize(int, CldHandlerIndex);
306   CpvInitialize(int, CldRelocatedMessages);
307   CpvInitialize(int, CldLoadBalanceMessages);
308   CpvInitialize(int, CldMessageChunks);
309   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
310   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
311   CpvAccess(CldMessageChunks) = 0;
312
313   CldModuleGeneralInit(argv);
314   CldGraphModuleInit(argv);
315
316   CpvAccess(CldLoadNotify) = 1;
317
318   CpvInitialize(int, isStealing);
319   CpvAccess(isStealing) = 0;
320 }
321
322 void CldCallback()
323 {}