33261d346a3ccb2b0f5156138e4afd5df98a50f4
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          0
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   int    balanceEvt;            /* user event for balancing */
20   int    idleEvt;               /* user event for idle balancing */
21   int    idleprocEvt;           /* user event for processing idle req */
22 } *CldProcInfo;
23
24 int _stealonly1 = 0;
25
26 CpvStaticDeclare(CldProcInfo, CldData);
27 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
28 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
29
30 void LoadNotifyFn(int l)
31 {
32 }
33
34 char *CldGetStrategy(void)
35 {
36   return "work stealing";
37 }
38
39 /* since I am idle, ask for work from neighbors */
40
41 static void CldBeginIdle(void *dummy)
42 {
43   int i;
44   double startT;
45   requestmsg msg;
46   int myload;
47   int  victim;
48   int mype;
49   int numpes;
50
51   CcdRaiseCondition(CcdUSER);
52
53   myload = CldLoad();
54
55   mype = CmiMyPe();
56   msg.from_pe = mype;
57   numpes = CmiNumPes();
58   do{
59       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
60   }while(victim == mype);
61
62   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
63 #if IDLE_IMMEDIATE
64   /* fixme */
65   CmiBecomeImmediate(&msg);
66 #endif
67   msg.to_rank = CmiRankOf(victim);
68   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
69   
70 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
71   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
72 #endif
73 }
74
75 /* immediate message handler, work at node level */
76 /* send some work to requested proc */
77 static void CldAskLoadHandler(requestmsg *msg)
78 {
79   int receiver, rank, recvIdx, i;
80   int myload = CldLoad();
81
82   int sendLoad;
83   sendLoad = myload / 2; 
84   receiver = msg->from_pe;
85   /* only give you work if I have more than 1 */
86   if (myload>LOADTHRESH) {
87       if(_stealonly1) sendLoad = 1;
88       rank = CmiMyRank();
89       if (msg->to_rank != -1) rank = msg->to_rank;
90       CldMultipleSend(receiver, sendLoad, rank, 0);
91   }else
92   {
93       requestmsg r_msg;
94       r_msg.from_pe = CmiMyPe();
95       r_msg.to_rank = CmiMyRank();
96
97       CcdRaiseCondition(CcdUSER);
98
99       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
100       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
101     /* send ack indicating there is no task */
102   }
103   CmiFree(msg);
104 }
105
106 void  CldAckNoTaskHandler(requestmsg *msg)
107 {
108   int victim; 
109   int notaskpe = msg->from_pe;
110   int mype = CmiMyPe();
111
112   CcdRaiseCondition(CcdUSER);
113
114   do{
115       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
116   }while(victim == mype);
117
118   /* reuse msg */
119   msg->to_rank = CmiRankOf(victim);
120   msg->from_pe = mype;
121   CmiSetHandler(msg, CpvAccess(CldAskLoadHandlerIndex));
122   CmiSyncSendAndFree(victim, sizeof(requestmsg),(char *)msg);
123
124 }
125
126 void CldHandler(void *msg)
127 {
128   CldInfoFn ifn; CldPackFn pfn;
129   int len, queueing, priobits; unsigned int *prioptr;
130   
131   CldRestoreHandler(msg);
132   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
133   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
134   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
135 }
136
137 void CldBalanceHandler(void *msg)
138 {
139   CldRestoreHandler(msg);
140   CldPutToken(msg);
141 }
142
143 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
144 {
145   int len, queueing, priobits,i; unsigned int *prioptr;
146   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
147   CldPackFn pfn;
148   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
149   if (pfn) {
150     pfn(&msg);
151     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
152   }
153   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
154   CmiSetInfo(msg,infofn);
155
156   CmiSyncMulticastAndFree(grp, len, msg);
157 }
158
159 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
160 {
161   int len, queueing, priobits,i; unsigned int *prioptr;
162   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
163   CldPackFn pfn;
164   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
165   if (pfn) {
166     pfn(&msg);
167     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
168   }
169   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
170   CmiSetInfo(msg,infofn);
171   CmiSyncListSendAndFree(npes, pes, len, msg);
172 }
173
174 void CldEnqueue(int pe, void *msg, int infofn)
175 {
176   int len, queueing, priobits, avg; unsigned int *prioptr;
177   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
178   CldPackFn pfn;
179
180   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
181       pe = CmiMyPe();
182     /* always pack the message because the message may be move away
183        to a different processor later by CldGetToken() */
184     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
185     if (pfn && CmiNumNodes()>1) {
186        pfn(&msg);
187        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
188     }
189     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
190     CmiSetInfo(msg,infofn);
191     CldPutToken(msg);
192   } 
193   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
194     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
195     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
196   }
197   else {
198     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
199     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
200       pfn(&msg);
201       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
202     }
203     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
204     CmiSetInfo(msg,infofn);
205     if (pe==CLD_BROADCAST) 
206       CmiSyncBroadcastAndFree(len, msg);
207     else if (pe==CLD_BROADCAST_ALL)
208       CmiSyncBroadcastAllAndFree(len, msg);
209     else CmiSyncSendAndFree(pe, len, msg);
210   }
211 }
212
213 void CldNodeEnqueue(int node, void *msg, int infofn)
214 {
215   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
216   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
217   CldPackFn pfn;
218   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
219       pe = CmiMyPe();
220       node = CmiNodeOf(pe);
221       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
222       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
223   }
224   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
225     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
226     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
227   } 
228   else {
229     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
230     if (pfn) {
231         pfn(&msg);
232         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
233     }
234     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
235     CmiSetInfo(msg,infofn);
236     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
237     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
238     else CmiSyncNodeSendAndFree(node, len, msg);
239   }
240 }
241
242
243 void CldGraphModuleInit(char **argv)
244 {
245   CpvInitialize(CldProcInfo, CldData);
246   CpvInitialize(int, CldAskLoadHandlerIndex);
247   CpvInitialize(int, CldAckNoTaskHandlerIndex);
248   CpvInitialize(int, CldBalanceHandlerIndex);
249
250   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
251 #if CMK_TRACE_ENABLED
252   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
253   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
254   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
255 #endif
256
257   CpvAccess(CldBalanceHandlerIndex) = 
258     CmiRegisterHandler(CldBalanceHandler);
259   CpvAccess(CldAskLoadHandlerIndex) = 
260     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
261   
262   CpvAccess(CldAckNoTaskHandlerIndex) = 
263     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
264
265   /* communication thread */
266   if (CmiMyRank() == CmiMyNodeSize())  return;
267
268   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
269
270   /* register idle handlers - when idle, keep asking work from neighbors */
271   if(CmiNumPes() > 1)
272   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
273       (CcdVoidFn) CldBeginIdle, NULL);
274   if (CmiMyPe() == 0) 
275       CmiPrintf("Charm++> Work stealing is enabled. \n");
276 }
277
278
279 void CldModuleInit(char **argv)
280 {
281   CpvInitialize(int, CldHandlerIndex);
282   CpvInitialize(int, CldRelocatedMessages);
283   CpvInitialize(int, CldLoadBalanceMessages);
284   CpvInitialize(int, CldMessageChunks);
285   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
286   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
287   CpvAccess(CldMessageChunks) = 0;
288
289   CldModuleGeneralInit(argv);
290
291   CpvAccess(CldLoadNotify) = 1;
292 }
293
294 void CldCallback()
295 {}