Remove archaic CVS keyword header comment blocks
[charm.git] / src / conv-ldb / cldb.prioritycentralized.c
1 #include "converse.h"
2 #include "cldb.prioritycentralized.h"
3 #include "queueing.h"
4 #include "cldb.h"
5
6 #include "priorityqueue.c"
7
8 #define IDLE_IMMEDIATE          1
9 #define TRACE_USEREVENTS        0
10
11 #define PERIOD 50                /* default: 30 */
12 #define MAXOVERLOAD 1
13
14 #define YH_DEBUG 0
15 #define THRESHOLD_LOAD 5
16 #define _U_INT_MAX 2000000000
17
18 #define LOAD_WEIGHT 0.1
19 #define PRIOR_WEIGHT 0.1
20
21 CpvDeclare(CldProcInfo, CldData);
22 extern char *_lbtopo;                   /* topology name string */
23 int _lbsteal = 0;                       /* work stealing flag */
24
25 CpvDeclare(MsgHeap, CldManagerLoadQueue);
26 CpvDeclare(CldSlavePriorInfo*, CldSlavesPriorityQueue); //maintened in master to check which processor has which priority
27
28 CpvDeclare(int, CldAskLoadHandlerIndex);
29 CpvDeclare(int,  CldstorecharemsgHandlerIndex);
30 CpvDeclare(int, CldHigherPriorityComesHandlerIndex);
31 CpvDeclare(int, CldReadytoExecHandlerIndex);
32 CpvDeclare(void*, CldRequestQueue);
33
34 void LoadNotifyFn(int l)
35 {
36   CldProcInfo  cldData = CpvAccess(CldData);
37   cldData->sent = 0;
38 }
39
40 char *CldGetStrategy(void)
41 {
42   return "prioritycentralized";
43 }
44
45 void SendTasktoPe(int receiver, void *msg)
46 {
47     CldInfoFn ifn; 
48     CldPackFn pfn;
49     int len, queueing, priobits, avg;
50     unsigned int *prioptr;
51     int old_load;
52     int new_load;
53
54     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
55     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
56     CldRestoreHandler(msg);
57     CldSwitchHandler(msg, CpvAccess(CldHigherPriorityComesHandlerIndex));
58     CmiSyncSendAndFree(receiver, len, msg);
59
60     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
61     new_load = old_load + 1;
62     if(old_load == 0)
63     {
64         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = *prioptr;
65     }else
66     {
67         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority/(new_load)*old_load + *prioptr/(new_load);
68     }
69     CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
70
71 #if YH_DEBUG
72     CmiPrintf(" P%d====>P%d sending this msg with prior %u  to processor %d len=%d \n", CmiMyPe(), receiver, *prioptr, receiver, len);
73 #endif
74 }
75 /* master processor , what to do when receive a new msg from network */
76 static void CldStoreCharemsg(void *msg)
77 {
78     /* insert the message into priority queue*/
79     CldInfoFn ifn; 
80     CldPackFn pfn;
81     int len, queueing, priobits, avg;
82     unsigned int *prioptr;
83     priormsg *p_msg ;
84     /* delay request msg */
85     requestmsg *request_msg;
86     int request_pe;
87     void* loadmsg;
88
89     /* check whether there is processor with lower priority, it exists, push this task to that processor */
90     /* find the processor with the highest priority */
91     int i=0; 
92     unsigned int _max_ = *prioptr;
93     int index = 1;
94     double max_evaluation = 0;
95     int old_load;
96    
97     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
98     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
99 #if YH_DEBUG
100     CmiPrintf(" Step 2: on processor 0, Get new created msg and store it , PRIOR=%u Timer=%f\n", *prioptr, CmiTimer());
101 #endif
102     //check which processor has underload and also the task priority is lower than the msg priority
103     for(i=1; i<CmiNumPes();i++)
104     {   //underload to avoid overflow
105 #if YH_DEBUG
106         CmiPrintf(" processor %d has load num:%d\n", i, CpvAccess(CldSlavesPriorityQueue)[i].load);
107 #endif
108         old_load = CpvAccess(CldSlavesPriorityQueue)[i].load;
109         if(old_load == 0)
110         {
111             index = i;
112             break;
113         }
114         double evaluation = (CpvAccess(CldSlavesPriorityQueue)[i].average_priority)* PRIOR_WEIGHT * (THRESHOLD_LOAD - CpvAccess(CldSlavesPriorityQueue)[i].load);
115         if(evaluation > max_evaluation)
116         {
117             max_evaluation = evaluation;
118             index = i;
119         }
120     }
121     if(old_load == 0 || CpvAccess(CldSlavesPriorityQueue)[index].average_priority > *prioptr)
122     {
123         //send task to that processor
124         SendTasktoPe(index, msg);
125 #if YH_DEBUG
126         CmiPrintf(" Step 2-1: processor 0 send task to idle processor %d, msg prior=%u Timer=%f\n", index, *prioptr, CmiTimer());
127 #endif
128         return;
129     }
130
131     p_msg = (priormsg*)malloc(sizeof(priormsg));
132     p_msg->priority = *prioptr;
133     p_msg->msg = msg;
134     /*Lock here? */
135     if(heap_isFull(&CpvAccess(CldManagerLoadQueue)))
136     {
137         CmiPrintf("Queue is already full, message will be lost\n");
138     }
139     else
140         heap_addItem(&CpvAccess(CldManagerLoadQueue), p_msg);
141 #if YH_DEBUG
142         CmiPrintf(" Step 2-3:  processor 0 , all processors are busy , store this msg  msg prior=%u Queuesize=%d Timer=%f\n", *prioptr, heap_size(&CpvAccess(CldManagerLoadQueue)), CmiTimer());
143 #endif
144
145 }
146 /* immediate message handler, work at node level */
147 /* send some work to requested proc */
148 static void CldAskLoadHandler(requestmsg *msg)
149 {
150     /* pickup the msg with the highest priority */
151     /* response to the requester chare */
152     int receiver, rank, recvIdx, i;
153     void* loadmsg;
154     CldInfoFn ifn; 
155     CldPackFn pfn;
156     int len, queueing, priobits, avg; 
157     unsigned int *prioptr;
158     
159     double old_load;
160     double new_load;
161     double old_average_prior;
162     /* only give you work if I have more than 1 */
163     receiver = msg->from_pe;
164     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
165     old_average_prior = CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority;
166 #if YH_DEBUG
167     CmiPrintf(" Step 6 :%f %d<======= getrequest  from processor queue current size=%d, notidle=%d, load=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)), msg->notidle, CpvAccess(CldSlavesPriorityQueue)[receiver].load);
168 #endif
169     if(!msg->notidle || old_load == 0 || old_load == 1)
170     {
171         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = _U_INT_MAX;
172         CpvAccess(CldSlavesPriorityQueue)[receiver].load = 0;
173     }else
174     {
175         new_load = old_load - 1;
176         CpvAccess(CldSlavesPriorityQueue)[receiver].load = new_load;
177         CpvAccess(CldSlavesPriorityQueue)[receiver].average_priority = old_average_prior/new_load * old_load - msg->priority/new_load;
178     }
179    
180     old_load = CpvAccess(CldSlavesPriorityQueue)[receiver].load;
181     if(old_load < THRESHOLD_LOAD)
182     {
183         priormsg *p_msg = heap_extractMin(&CpvAccess(CldManagerLoadQueue));
184         if(p_msg == 0)
185         {
186 #if YH_DEBUG
187             CmiPrintf(" Step 6-1 :%f Queue is empty no task %d<======= getrequest  from processor queue current size=%d\n", CmiTimer(), receiver, heap_size( &CpvAccess(CldManagerLoadQueue)));
188 #endif
189         return;
190         }
191         
192         loadmsg = p_msg->msg;
193         SendTasktoPe(receiver, loadmsg);
194     }
195 }
196
197 /***********************/
198 /* since I am idle, ask for work from neighbors */
199 static void CldBeginIdle(void *dummy)
200 {
201     CpvAccess(CldData)->lastCheck = CmiWallTimer();
202 }
203
204 static void CldEndIdle(void *dummy)
205 {
206     CpvAccess(CldData)->lastCheck = -1;
207 }
208
209 static void CldStillIdle(void *dummy, double curT)
210 {
211     if(CmiMyPe() == 0) 
212     {
213 #if YH_DEBUG
214         CmiPrintf(" Processor %d is idle, queue size=%d \n", CmiMyPe(), heap_size(&CpvAccess(CldManagerLoadQueue)) );
215 #endif
216         return;
217     }else
218     {
219 #if YH_DEBUG
220         CmiPrintf("Processor %d, has task number of %d\n", CmiMyPe(), CpvAccess(CldData)->load); 
221 #endif
222     }
223
224     int i;
225     double startT;
226     requestmsg msg;
227     CldProcInfo  cldData = CpvAccess(CldData);
228     double now = curT;
229     double lt = cldData->lastCheck;
230    
231     cldData->load  = 0;
232     msg.notidle = 0;
233     if ((lt!=-1 && now-lt< PERIOD*0.001) ) return;
234 #if YH_DEBUG
235     CmiPrintf("Step 1000: processor %d task is already zero ", CmiMyPe());
236 #endif
237
238     cldData->lastCheck = now;
239     msg.from_pe = CmiMyPe();
240     CmiSetHandler(&msg, CpvAccess(CldAskLoadHandlerIndex));
241
242     cldData->sent = 1;
243 #if YH_DEBUG
244     CmiPrintf("Step 1000: processor %d task is already zero sentidle=%d", CmiMyPe(), (&msg)->notidle);
245 #endif
246     CmiSyncSend(0, sizeof(requestmsg), &msg);
247 }
248 void CldReadytoExec(void *msg)
249 {
250
251     CldProcInfo  cldData = CpvAccess(CldData);
252     CldRestoreHandler(msg);
253     CmiHandleMessage(msg);
254     cldData->load = cldData->load - 1;
255
256     requestmsg r_msg;
257
258     r_msg.notidle = 1;
259     r_msg.from_pe = CmiMyPe();
260     CmiSetHandler(&r_msg, CpvAccess(CldAskLoadHandlerIndex));
261     CmiSyncSend(0, sizeof(requestmsg), &r_msg);
262
263 #if YH_DEBUG
264     CmiPrintf(" Step final: message is handled on processor %d, task left=%d", CmiMyPe(), cldData->load);
265 #endif
266 }
267 void HigherPriorityWork(void *msg)
268 {
269     //wrap this msg with token and put it into token queue
270     
271     CldInfoFn ifn;
272     CldPackFn pfn;
273     int len, queueing, priobits; 
274     unsigned int *prioptr;
275     CldProcInfo  cldData = CpvAccess(CldData);
276     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
277     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
278     CldRestoreHandler(msg);
279     CldSwitchHandler(msg, CpvAccess(CldReadytoExecHandlerIndex));
280     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
281     cldData->load = cldData->load  + 1;
282
283 #if YH_DEBUG
284     CmiPrintf(" Step 3:  processor %d, Task arrives and put it into charm++ queue, prior=%u Timer=%f\n", CmiMyPe(), *prioptr, CmiTimer());
285 #endif
286 }
287
288
289 void CldEnqueue(int pe, void *msg, int infofn)
290 {
291   int len, queueing, priobits, avg; 
292   unsigned int *prioptr;
293   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
294   CldPackFn pfn;
295  
296   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
297   CmiSetInfo(msg,infofn);
298 #if YH_DEBUG
299   CmiPrintf(" Step 1: Creation New msg on pe %d priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(),  *prioptr, CmiTimer(), len);
300 #endif
301
302   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
303       pe = CmiMyPe();
304     /* always pack the message because the message may be move away
305        to a different processor later by CldGetToken() */
306       CldSwitchHandler(msg, CpvAccess(CldstorecharemsgHandlerIndex));
307       if(pe == 0)
308       {
309           CldStoreCharemsg(msg);
310       }else{
311           if (pfn && CmiNumNodes()>1) {
312               pfn(&msg);
313               ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
314           }
315 #if YH_DEBUG
316           CmiPrintf(" Step 1-1: Creation New msg on pe%d ==> p0  priority=%u Timer:%f (msg len=%d)\n", CmiMyPe(),  *prioptr, CmiTimer(), len);
317 #endif
318
319           CmiSyncSendAndFree(0, len, msg);
320       }
321   }else if((pe == CmiMyPe()) || (CmiNumPes() == 1) ) {
322   
323       CsdEnqueueGeneral(msg, CQS_QUEUEING_IFIFO, priobits, prioptr);
324   }else {
325       if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
326           pfn(&msg);
327           ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
328       }
329       if (pe==CLD_BROADCAST) 
330           CmiSyncBroadcastAndFree(len, msg);
331       else if (pe==CLD_BROADCAST_ALL)
332           CmiSyncBroadcastAllAndFree(len, msg);
333       else CmiSyncSendAndFree(pe, len, msg);
334
335   }
336 }
337
338 void CldHandler(char *msg)
339 {
340   int len, queueing, priobits;
341   unsigned int *prioptr;
342   CldInfoFn ifn; CldPackFn pfn;
343   CldRestoreHandler(msg);
344   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
345   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
346   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
347 }
348
349 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
350 {
351   int len, queueing, priobits,i; 
352   unsigned int *prioptr;
353   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
354   CldPackFn pfn;
355   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
356   if (pfn) {
357     pfn(&msg);
358     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
359   }
360   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
361   CmiSetInfo(msg,infofn);
362
363   CmiSyncMulticastAndFree(grp, len, msg);
364 }
365
366 void  CldOtherInit()
367 {
368
369   CpvInitialize(CldProcInfo, CldData);
370   CpvAccess(CldData) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
371   CpvAccess(CldData)->lastCheck = -1;
372   CpvAccess(CldData)->sent = 0;
373   CpvAccess(CldData)->load = 0;
374 #if 1
375   _lbsteal = 1;//CmiGetArgFlagDesc(argv, "+workstealing", "Charm++> Enable work stealing at idle time");
376   if (_lbsteal) {
377   /* register idle handlers - when idle, keep asking work from neighbors */
378   CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,
379       (CcdVoidFn) CldBeginIdle, NULL);
380   CcdCallOnConditionKeep(CcdPROCESSOR_STILL_IDLE,
381       (CcdVoidFn) CldStillIdle, NULL);
382   CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,
383       (CcdVoidFn) CldEndIdle, NULL);
384     if (CmiMyPe() == 0) 
385       CmiPrintf("Charm++> Work stealing is enabled. \n");
386   }
387 #endif
388     
389
390   if (CmiMyPe() == 0){
391       int numpes = CmiNumPes();
392       CpvAccess(CldSlavesPriorityQueue) = (CldSlavePriorInfo*)CmiAlloc(sizeof(CldSlavePriorInfo) * numpes);
393       int i=0;
394       for(i=0; i<numpes; i++){
395           CpvAccess(CldSlavesPriorityQueue)[i].average_priority = _U_INT_MAX;
396           CpvAccess(CldSlavesPriorityQueue)[i].load = 0;
397       }
398   }
399 }
400
401 void CldModuleInit(char **argv)
402 {
403
404   CpvInitialize(int, CldHandlerIndex);
405   CpvAccess(CldHandlerIndex) = CmiRegisterHandler((CmiHandler)CldHandler);
406   /* Yanhua */
407   CpvInitialize(int, CldAskLoadHandlerIndex);
408   CpvInitialize(int, CldstorecharemsgHandlerIndex);
409   CpvInitialize(int, CldHigherPriorityComesHandlerIndex);
410   CpvInitialize(int, CldReadytoExecHandlerIndex);
411   CpvInitialize(MsgHeap, CldManagerLoadQueue);
412   CpvInitialize(CldSlavePriorInfo*, CldSlavesPriorityQueue);
413   CpvInitialize(void*, CldRequestQueue);
414
415   CpvAccess(CldstorecharemsgHandlerIndex) = CmiRegisterHandler(CldStoreCharemsg);
416   CpvAccess(CldHigherPriorityComesHandlerIndex) = CmiRegisterHandler(HigherPriorityWork);
417   CpvAccess(CldAskLoadHandlerIndex) = CmiRegisterHandler((CmiHandler)CldAskLoadHandler);
418   CpvAccess(CldReadytoExecHandlerIndex) = CmiRegisterHandler((CmiHandler)CldReadytoExec);
419   CpvAccess(CldRequestQueue) = (void *)CqsCreate();
420   CldModuleGeneralInit(argv);
421   
422   CldOtherInit();
423   
424   CpvAccess(CldLoadNotify) = 1;
425   //CpvAccess(tokenqueue)->head->succ = CpvAccess(tokenqueue)->tail;
426
427  
428 }
429
430
431
432 void CldNodeEnqueue(int node, void *msg, int infofn)
433 {
434   int len, queueing, priobits; 
435   unsigned int *prioptr;
436   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
437   CldPackFn pfn;
438   if (node == CLD_ANYWHERE) {
439     node = (((CrnRand()+CmiMyNode())&0x7FFFFFFF)%CmiNumNodes());
440     if (node != CmiMyNode())
441       CpvAccess(CldRelocatedMessages)++;
442   }
443   if (node == CmiMyNode() && !CmiImmIsRunning()) {
444     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
445     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
446   } else {
447     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
448     if (pfn) {
449       pfn(&msg);
450       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
451     }
452     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
453     CmiSetInfo(msg,infofn);
454     if (node==CLD_BROADCAST) { CmiSyncNodeBroadcastAndFree(len, msg); }
455     else if (node==CLD_BROADCAST_ALL){CmiSyncNodeBroadcastAllAndFree(len,msg);}
456     else CmiSyncNodeSendAndFree(node, len, msg);
457   }
458 }
459
460 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
461 {
462   int len, queueing, priobits,i; 
463   unsigned int *prioptr;
464   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
465   CldPackFn pfn;
466   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
467   if (pfn) {
468     pfn(&msg);
469     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
470   }
471   CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
472   CmiSetInfo(msg,infofn);
473
474   CmiSyncListSendAndFree(npes, pes, len, msg);
475 }
476
477
478 void CldCallback()
479 {}