build: fix travis MPI/SMP build
[charm.git] / src / conv-ldb / cldb.prioritycentralized.C
1 #include "converse.h"
2 #include "cldb.prioritycentralized.h"
3 #include "queueing.h"
4 #include "cldb.h"
5
6 #include "priorityqueue.C"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 50                /* default: 30 */
12 #define MAXOVERLOAD 1
13
14 #define YH_DEBUG 0
15 #define THRESHOLD_LOAD 5
16 #define _U_INT_MAX 2000000000
17
18 #define LOAD_WEIGHT 0.1
19 #define PRIOR_WEIGHT 0.1
20
21 CpvDeclare(CldProcInfo, CldData);
22 extern char *_lbtopo;                   /* topology name string */
23 int _lbsteal = 0;                       /* work stealing flag */
24
25 CpvDeclare(MsgHeap, CldManagerLoadQueue);
26 CpvDeclare(CldSlavePriorInfo*, CldSlavesPriorityQueue); //maintened in master to check which processor has which priority
27
28 CpvDeclare(int, CldAskLoadHandlerIndex);
29 CpvDeclare(int,  CldstorecharemsgHandlerIndex);
30 CpvDeclare(int, CldHigherPriorityComesHandlerIndex);
31 CpvDeclare(int, CldReadytoExecHandlerIndex);
32 CpvDeclare(void*, CldRequestQueue);
33
34 void LoadNotifyFn(int l)
35 {
36   CldProcInfo  cldData = CpvAccess(CldData);
37   cldData->sent = 0;
38 }
39
40 const char *CldGetStrategy(void)
41 {
42   return "prioritycentralized";
43 }
44
45 void SendTasktoPe(int receiver, void *msg)
46 {
47     CldInfoFn ifn; 
48     CldPackFn pfn;
49     int len, queueing, priobits, avg;
50     unsigned int *prioptr;
51     int old_load;
52     int new_load;
53
54     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
55     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
56     CldRestoreHandler((char *)msg);
57     CldSwitchHandler((char *)msg, CpvAccess(CldHigherPriorityComesHandlerIndex));
58     CmiSyncSendAndFree(receiver, len, msg);
59
60     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
61     new_load = old_load + 1;
62     if(old_load == 0)
63     {
64         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = *prioptr;
65     }else
66     {
67         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority/(new_load)*old_load + *prioptr/(new_load);
68     }
69     CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
70
71 #if YH_DEBUG
72     CmiPrintf(" P%d====>P%d sending this msg with prior %u  to processor %d len=%d \n", CmiMyPe(), receiver, *prioptr, receiver, len);
73 #endif
74 }
75 /* master processor , what to do when receive a new msg from network */
76 static void CldStoreCharemsg(void *msg)
77 {
78     /* insert the message into priority queue*/
79     CldInfoFn ifn; 
80     CldPackFn pfn;
81     int len, queueing, priobits, avg;
82     unsigned int *prioptr;
83     priormsg *p_msg ;
84     /* delay request msg */
85     requestmsg *request_msg;
86     int request_pe;
87     void* loadmsg;
88
89     /* check whether there is processor with lower priority, it exists, push this task to that processor */
90     /* find the processor with the highest priority */
91     int i=0; 
92     int index = 1;
93     double max_evaluation = 0;
94     int old_load;
95    
96     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
97     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
98 #if YH_DEBUG
99     CmiPrintf(" Step 2: on processor 0, Get new created msg and store it , PRIOR=%u Timer=%f\n", *prioptr, CmiTimer());
100 #endif
101     //check which processor has underload and also the task priority is lower than the msg priority
102     for(i=1; i<CmiNumPes();i++)
103     {   //underload to avoid overflow
104 #if YH_DEBUG
105         CmiPrintf(" processor %d has load num:%d\n", i, CpvAccess(CldSlavesPriorityQueue)[i].load);
106 #endif
107         old_load = CpvAccess(CldSlavesPriorityQueue)[i].load;
108         if(old_load == 0)
109         {
110             index = i;
111             break;
112         }
113         double evaluation = (CpvAccess(CldSlavesPriorityQueue)[i].average_priority)* PRIOR_WEIGHT * (THRESHOLD_LOAD - CpvAccess(CldSlavesPriorityQueue)[i].load);
114         if(evaluation > max_evaluation)
115         {
116             max_evaluation = evaluation;
117             index = i;
118         }
119     }
120     if(old_load == 0 || CpvAccess(CldSlavesPriorityQueue)[index].average_priority > *prioptr)
121     {
122         //send task to that processor
123         SendTasktoPe(index, msg);
124 #if YH_DEBUG
125         CmiPrintf(" Step 2-1: processor 0 send task to idle processor %d, msg prior=%u Timer=%f\n", index, *prioptr, CmiTimer());
126 #endif
127         return;
128     }
129
130     p_msg = (priormsg*)malloc(sizeof(priormsg));
131     p_msg->priority = *prioptr;
132     p_msg->msg = msg;
133     /*Lock here? */
134     if(heap_isFull(&CpvAccess(CldManagerLoadQueue)))
135     {
136         CmiPrintf("Queue is already full, message will be lost\n");
137     }
138     else
139         heap_addItem(&CpvAccess(CldManagerLoadQueue), p_msg);
140 #if YH_DEBUG
141         CmiPrintf(" Step 2-3:  processor 0 , all processors are busy , store this msg  msg prior=%u Queuesize=%d Timer=%f\n", *prioptr, heap_size(&CpvAccess(CldManagerLoadQueue)), CmiTimer());
142 #endif
143
144 }
145 /* immediate message handler, work at node level */
146 /* send some work to requested proc */
147 static void CldAskLoadHandler(requestmsg *msg)
148 {
149     /* pickup the msg with the highest priority */
150     /* response to the requester chare */
151     int receiver, rank, recvIdx, i;
152     void* loadmsg;
153     CldInfoFn ifn; 
154     CldPackFn pfn;
155     int len, queueing, priobits, avg; 
156     unsigned int *prioptr;
157     
158     double old_load;
159     double new_load;
160     double old_average_prior;
161     /* only give you work if I have more than 1 */
162     receiver = msg->from_pe;
163     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
164     old_average_prior = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority;
165 #if YH_DEBUG
166     CmiPrintf(" Step 6 :%f %d<======= getrequest  from processor queue current size=%d, notidle=%d, load=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)), msg->notidle, CpvAccess(CldSlavesPriorityQueue)[receiver].load);
167 #endif
168     if(!msg->notidle || old_load == 0 || old_load == 1)
169     {
170         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = _U_INT_MAX;
171         CpvAccess(CldSlavesPriorityQueue)[receiver].load = 0;
172     }else
173     {
174         new_load = old_load - 1;
175         CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
176         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = old_average_prior/new_load * old_load - msg->priority/new_load;
177     }
178    
179     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
180     if(old_load < THRESHOLD_LOAD)
181     {
182         priormsg *p_msg = heap_extractMin(&CpvAccess(CldManagerLoadQueue));
183         if(p_msg == 0)
184         {
185 #if YH_DEBUG
186             CmiPrintf(" Step 6-1 :%f Queue is empty no task %d<======= getrequest  from processor queue current size=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)));
187 #endif
188         return;
189         }
190         
191         loadmsg = p_msg->msg;
192         SendTasktoPe(receiver, loadmsg);
193     }
194 }
195
196 /***********************/
197 /* since I am idle, ask for work from neighbors */
198 static void CldBeginIdle(void *dummy)
199 {
200     CpvAccess(CldData)->lastCheck = CmiWallTimer();
201 }
202
203 static void CldEndIdle(void *dummy)
204 {
205     CpvAccess(CldData)->lastCheck = -1;
206 }
207
208 static void CldStillIdle(void *dummy, double curT)
209 {
210     if(CmiMyPe() == 0) 
211     {
212 #if YH_DEBUG
213         CmiPrintf(" Processor %d is idle, queue size=%d \n", CmiMyPe(), heap_size(&CpvAccess(CldManagerLoadQueue)) );
214 #endif
215         return;
216     }else
217     {
218 #if YH_DEBUG
219         CmiPrintf("Processor %d, has task number of %d\n", CmiMyPe(), CpvAccess(CldData)->load); 
220 #endif
221     }
222
223     int i;
224     double startT;
225     requestmsg msg;
226     CldProcInfo  cldData = CpvAccess(CldData);
227     double now = curT;
228     double lt = cldData->lastCheck;
229    
230     cldData->load  = 0;
231     msg.notidle = 0;
232     if ((lt!=-1 && now-lt< PERIOD*0.001) ) return;
233 #if YH_DEBUG
234     CmiPrintf("Step 1000: processor %d task is already zero ", CmiMyPe());
235 #endif
236
237     cldData->lastCheck = now;
238     msg.from_pe = CmiMyPe();
239     CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
240
241     cldData->sent = 1;
242 #if YH_DEBUG
243     CmiPrintf("Step 1000: processor %d task is already zero sentidle=%d", CmiMyPe(), (&msg)->notidle);
244 #endif
245     CmiSyncSend(0, sizeof(requestmsg), &msg);
246 }
247 void CldReadytoExec(void *msg)
248 {
249
250     CldProcInfo  cldData = CpvAccess(CldData);
251     CldRestoreHandler((char *)msg);
252     CmiHandleMessage(msg);
253     cldData->load = cldData->load - 1;
254
255     requestmsg r_msg;
256
257     r_msg.notidle = 1;
258     r_msg.from_pe = CmiMyPe();
259     CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
260     CmiSyncSend(0, sizeof(requestmsg), &r_msg);
261
262 #if YH_DEBUG
263     CmiPrintf(" Step final: message is handled on processor %d, task left=%d", CmiMyPe(), cldData->load);
264 #endif
265 }
266 void HigherPriorityWork(void *msg)
267 {
268     //wrap this msg with token and put it into token queue
269     
270     CldInfoFn ifn;
271     CldPackFn pfn;
272     int len, queueing, priobits; 
273     unsigned int *prioptr;
274     CldProcInfo  cldData = CpvAccess(CldData);
275     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
276     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
277     CldRestoreHandler((char *)msg);
278     CldSwitchHandler((char *)msg, CpvAccess(CldReadytoExecHandlerIndex));
279     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
280     cldData->load = cldData->load  + 1;
281
282 #if YH_DEBUG
283     CmiPrintf(" Step 3:  processor %d, Task arrives and put it into charm++ queue, prior=%u Timer=%f\n", CmiMyPe(), *prioptr, CmiTimer());
284 #endif
285 }
286
287
288 void CldEnqueue(int pe, void *msg, int infofn)
289 {
290   int len, queueing, priobits, avg; 
291   unsigned int *prioptr;
292   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
293   CldPackFn pfn;
294  
295   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
296   CmiSetInfo(msg,infofn);
297 #if YH_DEBUG
298   CmiPrintf(" Step 1: Creation New msg on pe %d priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(),  *prioptr, CmiTimer(), len);
299 #endif
300
301   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
302       pe = CmiMyPe();
303     /* always pack the message because the message may be move away
304        to a different processor later by CldGetToken() */
305       CldSwitchHandler((char *)msg, CpvAccess(CldstorecharemsgHandlerIndex));
306       if(pe == 0)
307       {
308           CldStoreCharemsg(msg);
309       }else{
310           if (pfn && CmiNumNodes()>1) {
311               pfn(&msg);
312               ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
313           }
314 #if YH_DEBUG
315           CmiPrintf(" Step 1-1: Creation New msg on pe%d ==> p0  priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(),  *prioptr, CmiTimer(), len);
316 #endif
317
318           CmiSyncSendAndFree(0, len, msg);
319       }
320   }else if((pe == CmiMyPe()) || (CmiNumPes() == 1) ) {
321   
322       CsdEnqueueGeneral(msg, CQS_QUEUEING_IFIFO, priobits, prioptr);
323   }else {
324       if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
325           pfn(&msg);
326           ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
327       }
328       if (pe==CLD_BROADCAST) 
329           CmiSyncBroadcastAndFree(len, msg);
330       else if (pe==CLD_BROADCAST_ALL)
331           CmiSyncBroadcastAllAndFree(len, msg);
332       else CmiSyncSendAndFree(pe, len, msg);
333
334   }
335 }
336
337 void CldHandler(char *msg)
338 {
339   int len, queueing, priobits;
340   unsigned int *prioptr;
341   CldInfoFn ifn; CldPackFn pfn;
342   CldRestoreHandler(msg);
343   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
344   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
345   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
346 }
347
348 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
349 {
350   int len, queueing, priobits,i; 
351   unsigned int *prioptr;
352   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
353   CldPackFn pfn;
354   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
355   if (pfn) {
356     pfn(&msg);
357     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
358   }
359   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
360   CmiSetInfo(msg,infofn);
361
362   CmiSyncMulticastAndFree(grp, len, msg);
363 }
364
365 void  CldOtherInit()
366 {
367
368   CpvInitialize(CldProcInfo, CldData);
369   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
370   CpvAccess(CldData)->lastCheck = -1;
371   CpvAccess(CldData)->sent = 0;
372   CpvAccess(CldData)->load = 0;
373 #if 1
374   _lbsteal = 1;//CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
375   if (_lbsteal) {
376   /* register idle handlers - when idle, keep asking work from neighbors */
377   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
378       (CcdVoidFn) CldBeginIdle, NULL);
379   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
380       (CcdVoidFn) CldStillIdle, NULL);
381   CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
382       (CcdVoidFn) CldEndIdle, NULL);
383     if (CmiMyPe() == 0) 
384       CmiPrintf("Charm++> Work stealing is enabled. \n");
385   }
386 #endif
387     
388
389   if (CmiMyPe() == 0){
390       int numpes = CmiNumPes();
391       CpvAccess(CldSlavesPriorityQueue) = (CldSlavePriorInfo*)CmiAlloc(sizeof(CldSlavePriorInfo) * numpes);
392       int i=0;
393       for(i=0; i<numpes; i++){
394           CpvAccess(CldSlavesPriorityQueue)[i].average_priority = _U_INT_MAX;
395           CpvAccess(CldSlavesPriorityQueue)[i].load = 0;
396       }
397   }
398 }
399
400 void CldModuleInit(char **argv)
401 {
402
403   CpvInitialize(int, CldHandlerIndex);
404   CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
405   /* Yanhua */
406   CpvInitialize(int, CldAskLoadHandlerIndex);
407   CpvInitialize(int, CldstorecharemsgHandlerIndex);
408   CpvInitialize(int, CldHigherPriorityComesHandlerIndex);
409   CpvInitialize(int, CldReadytoExecHandlerIndex);
410   CpvInitialize(MsgHeap, CldManagerLoadQueue);
411   CpvInitialize(CldSlavePriorInfo*, CldSlavesPriorityQueue);
412   CpvInitialize(void*, CldRequestQueue);
413
414   CpvAccess(CldstorecharemsgHandlerIndex) = CmiRegisterHandler(CldStoreCharemsg);
415   CpvAccess(CldHigherPriorityComesHandlerIndex) = CmiRegisterHandler(HigherPriorityWork);
416   CpvAccess(CldAskLoadHandlerIndex) = CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
417   CpvAccess(CldReadytoExecHandlerIndex) = CmiRegisterHandler((CmiHandler)CldReadytoExec);
418   CpvAccess(CldRequestQueue) = (void *)CqsCreate();
419   CldModuleGeneralInit(argv);
420   
421   CldOtherInit();
422   
423   CpvAccess(CldLoadNotify) = 1;
424   //CpvAccess(tokenqueue)->head->succ = CpvAccess(tokenqueue)->tail;
425
426  
427 }
428
429
430
431 void CldNodeEnqueue(int node, void *msg, int infofn)
432 {
433   int len, queueing, priobits; 
434   unsigned int *prioptr;
435   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
436   CldPackFn pfn;
437   if (node == CLD_ANYWHERE) {
438     node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
439     if (node != CmiMyNode())
440       CpvAccess(CldRelocatedMessages)++;
441   }
442   if (node == CmiMyNode() && !CmiImmIsRunning()) {
443     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
444     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
445   } else {
446     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
447     if (pfn) {
448       pfn(&msg);
449       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
450     }
451     CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
452     CmiSetInfo(msg,infofn);
453     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
454     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
455     else CmiSyncNodeSendAndFree(node, len, msg);
456   }
457 }
458
459 void CldEnqueueMulti(int npes, const int *pes, void *msg, int infofn)
460 {
461   int len, queueing, priobits,i; 
462   unsigned int *prioptr;
463   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
464   CldPackFn pfn;
465   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
466   if (pfn) {
467     pfn(&msg);
468     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
469   }
470   CldSwitchHandler((char *)msg, CpvAccess(CldHandlerIndex));
471   CmiSetInfo(msg,infofn);
472
473   CmiSyncListSendAndFree(npes, pes, len, msg);
474 }
475
476
477 void CldCallback()
478 {}