Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-ldb / cldb.workstealing.c
1 #include <stdlib.h>
2
3 #include "converse.h"
4 #include "cldb.workstealing.h"
5 #include "queueing.h"
6 #include "cldb.h"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 10                /* default: 30 */
12 #define MSGDELAY 10
13 #define MAXOVERLOAD 1
14
15 #define LOADTHRESH       3
16
17
18 typedef struct CldProcInfo_s {
19   //double lastCheck;
20   //int    sent;                        /* flag to disable idle work request */
21   int    balanceEvt;            /* user event for balancing */
22   int    idleEvt;               /* user event for idle balancing */
23   int    idleprocEvt;           /* user event for processing idle req */
24   //double lastBalanceTime;
25 } *CldProcInfo;
26
27 int _stealonly1 = 0;
28
29 CpvStaticDeclare(CldProcInfo, CldData);
30 CpvStaticDeclare(int, CldAskLoadHandlerIndex);
31 CpvStaticDeclare(int, CldAckNoTaskHandlerIndex);
32
33 void LoadNotifyFn(int l)
34 {
35   //CldProcInfo  cldData = CpvAccess(CldData);
36   //cldData->sent = 0;
37 }
38
39 char *CldGetStrategy(void)
40 {
41   return "work stealing";
42 }
43
44 /* since I am idle, ask for work from neighbors */
45
46
47 static void CldBeginIdle(void *dummy)
48 {
49   int i;
50   double startT;
51   requestmsg msg;
52   int myload;
53   CldProcInfo  cldData = CpvAccess(CldData);
54   int  victim;
55   int mype;
56   int numpes;
57
58   myload = CldLoad();
59   //if (myload > 0) return; // I do not think this will be true when a processor is idle. overhead code
60
61   mype = CmiMyPe();
62   msg.from_pe = mype;
63   numpes = CmiNumPes();
64   do{
65       victim = (((CrnRand()+mype)&0x7FFFFFFF)%numpes);
66   }while(victim == mype);
67
68   CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
69 #if IDLE_IMMEDIATE
70   /* fixme */
71   CmiBecomeImmediate(&msg);
72 #endif
73   msg.to_rank = CmiRankOf(victim);
74   CmiSyncSend(victim, sizeof(requestmsg),(char *)&msg);
75   
76   //cldData->sent = 1;
77 #if CMK_TRACE_ENABLED && TRACE_USEREVENTS
78   traceUserBracketEvent(cldData->idleEvt, now, CmiWallTimer());
79 #endif
80 }
81
82 /* immediate message handler, work at node level */
83 /* send some work to requested proc */
84 static void CldAskLoadHandler(requestmsg *msg)
85 {
86   int receiver, rank, recvIdx, i;
87   int myload = CldLoad();
88   double now = CmiWallTimer();
89   CldProcInfo  cldData = CpvAccess(CldData);
90
91   int sendLoad;
92   sendLoad = myload / 2; 
93   receiver = msg->from_pe;
94   /* only give you work if I have more than 1 */
95   if (myload>LOADTHRESH) {
96       if(_stealonly1) sendLoad = 1;
97       rank = CmiMyRank();
98       if (msg->to_rank != -1) rank = msg->to_rank;
99       CldMultipleSend(receiver, sendLoad, rank, 0);
100   }else
101   {
102       requestmsg r_msg;
103       r_msg.from_pe = CmiMyPe();
104       r_msg.to_rank = CmiMyRank();
105
106       CmiSetHandler(&r_msg, CpvAccess(CldAckNoTaskHandlerIndex));
107       CmiSyncSend(receiver, sizeof(requestmsg),(char *)&r_msg);
108     /* send ack indicating there is no task */
109   }
110   CmiFree(msg);
111 }
112
113 void  CldAckNoTaskHandler(requestmsg *msg)
114 {
115     int victim; 
116     requestmsg r_msg;
117     int notaskpe = msg->from_pe;
118     CldProcInfo  cldData = CpvAccess(CldData);
119     int mype = CmiMyPe();
120   do{
121       victim = (((CrnRand()+notaskpe)&0x7FFFFFFF)%CmiNumPes());
122   }while(victim == mype || victim == notaskpe);
123
124   /* fixme */
125   //CmiBecomeImmediate(&msg);
126   r_msg.to_rank = CmiRankOf(victim);
127   r_msg.from_pe = mype;
128   CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
129   CmiSyncSend(victim, sizeof(requestmsg),(char *)&r_msg);
130   //cldData->sent = 1;
131
132   //cldData->lastCheck = CmiWallTimer();
133
134   CmiFree(msg);
135
136 }
137 void CldHandler(void *msg)
138 {
139   CldInfoFn ifn; CldPackFn pfn;
140   int len, queueing, priobits; unsigned int *prioptr;
141   
142   CldRestoreHandler(msg);
143   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
144   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
145   /*CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr); */
146   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
147 }
148
149 void CldBalanceHandler(void *msg)
150 {
151   CldRestoreHandler(msg);
152   CldPutToken(msg);
153 }
154
155 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
156 {
157   int len, queueing, priobits,i; unsigned int *prioptr;
158   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
159   CldPackFn pfn;
160   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
161   if (pfn) {
162     pfn(&msg);
163     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
164   }
165   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
166   CmiSetInfo(msg,infofn);
167
168   CmiSyncMulticastAndFree(grp, len, msg);
169 }
170
171 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
172 {
173   int len, queueing, priobits,i; unsigned int *prioptr;
174   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
175   CldPackFn pfn;
176   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
177   if (pfn) {
178     pfn(&msg);
179     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
180   }
181   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
182   CmiSetInfo(msg,infofn);
183   CmiSyncListSendAndFree(npes, pes, len, msg);
184 }
185
186 void CldEnqueue(int pe, void *msg, int infofn)
187 {
188   int len, queueing, priobits, avg; unsigned int *prioptr;
189   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
190   CldPackFn pfn;
191
192   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
193       pe = CmiMyPe();
194     /* always pack the message because the message may be move away
195        to a different processor later by CldGetToken() */
196     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
197     if (pfn && CmiNumNodes()>1) {
198        pfn(&msg);
199        ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
200     }
201     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
202     CmiSetInfo(msg,infofn);
203     CldPutToken(msg);
204   } 
205   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
206     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
207     //CsdEnqueueGeneral(msg, CQS_QUEUEING_LIFO, priobits, prioptr);
208     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
209   }
210   else {
211     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
212     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
213       pfn(&msg);
214       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
215     }
216     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
217     CmiSetInfo(msg,infofn);
218     if (pe==CLD_BROADCAST) 
219       CmiSyncBroadcastAndFree(len, msg);
220     else if (pe==CLD_BROADCAST_ALL)
221       CmiSyncBroadcastAllAndFree(len, msg);
222     else CmiSyncSendAndFree(pe, len, msg);
223   }
224 }
225
226 void CldNodeEnqueue(int node, void *msg, int infofn)
227 {
228   int len, queueing, priobits, pe, avg; unsigned int *prioptr;
229   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
230   CldPackFn pfn;
231   if ((node == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
232       pe = CmiMyPe();
233       node = CmiNodeOf(pe);
234       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
235       CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
236   }
237   else if ((node == CmiMyNode()) || (CmiNumPes() == 1)) {
238     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
239     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
240   } 
241   else {
242     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
243     if (pfn) {
244         pfn(&msg);
245         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
246     }
247     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
248     CmiSetInfo(msg,infofn);
249     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
250     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
251     else CmiSyncNodeSendAndFree(node, len, msg);
252   }
253 }
254
255
256 void CldGraphModuleInit(char **argv)
257 {
258   CpvInitialize(CldProcInfo, CldData);
259   CpvInitialize(int, CldAskLoadHandlerIndex);
260   CpvInitialize(int, CldAckNoTaskHandlerIndex);
261   CpvInitialize(int, CldBalanceHandlerIndex);
262
263   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
264   //CpvAccess(CldData)->lastCheck = -1;
265   //CpvAccess(CldData)->sent = 0;
266 #if CMK_TRACE_ENABLED
267   CpvAccess(CldData)->balanceEvt = traceRegisterUserEvent("CldBalance", -1);
268   CpvAccess(CldData)->idleEvt = traceRegisterUserEvent("CldBalanceIdle", -1);
269   CpvAccess(CldData)->idleprocEvt = traceRegisterUserEvent("CldBalanceProcIdle", -1);
270 #endif
271
272   CpvAccess(CldBalanceHandlerIndex) = 
273     CmiRegisterHandler(CldBalanceHandler);
274   CpvAccess(CldAskLoadHandlerIndex) = 
275     CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
276   
277   CpvAccess(CldAckNoTaskHandlerIndex) = 
278     CmiRegisterHandler((CmiHandler)CldAckNoTaskHandler);
279
280   /* communication thread */
281   if (CmiMyRank() == CmiMyNodeSize())  return;
282
283   _stealonly1 = CmiGetArgFlagDesc(argv, "+stealonly1", "Charm++> Work Stealing, every time only steal 1 task");
284
285 #if 1
286   /* register idle handlers - when idle, keep asking work from neighbors */
287   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
288       (CcdVoidFn) CldBeginIdle, NULL);
289     if (CmiMyPe() == 0) 
290       CmiPrintf("Charm++> Work stealing is enabled. \n");
291 #endif
292 }
293
294
295 void CldModuleInit(char **argv)
296 {
297   CpvInitialize(int, CldHandlerIndex);
298   CpvInitialize(int, CldRelocatedMessages);
299   CpvInitialize(int, CldLoadBalanceMessages);
300   CpvInitialize(int, CldMessageChunks);
301   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
302   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
303   CpvAccess(CldMessageChunks) = 0;
304
305   CldModuleGeneralInit(argv);
306   CldGraphModuleInit(argv);
307
308   CpvAccess(CldLoadNotify) = 1;
309 }
310
311 void CldCallback()
312 {}