b97ed3780aa3ffb378a8a1b51b0fc3ebbf8dec0d
[charm.git] / src / conv-core / conv-conds.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "converse.h"
12
13 /**
14  * Structure to hold the requisites for a callback
15  */
16 typedef struct _ccd_callback {
17   CcdVoidFn fn;
18   void *arg;
19   int pe;                       /* the pe that sets the callback */
20 } ccd_callback;
21
22
23
24 /**
25  * An element (a single callback) in a list of callbacks
26  */
27 typedef struct _ccd_cblist_elem {
28   ccd_callback cb;
29   int next;
30   int prev;
31 } ccd_cblist_elem;
32
33
34
35 /**
36  * A list of callbacks stored as an array and handled like a list
37  */
38 typedef struct _ccd_cblist {
39   unsigned int maxlen;
40   unsigned int len;
41   int first, last;
42   int first_free;
43   ccd_cblist_elem *elems;
44   int flag;
45 } ccd_cblist;
46
47
48
49 /** Initialize a list of callbacks. Alloc memory, set counters etc. */
50 static void init_cblist(ccd_cblist *l, unsigned int ml)
51 {
52   int i;
53   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
54   _MEMCHECK(l->elems);
55   for(i=0;i<ml;i++) {
56     l->elems[i].next = i+1;
57     l->elems[i].prev = i-1;
58   }
59   l->elems[ml-1].next = -1;
60   l->len = 0;
61   l->maxlen = ml;
62   l->first = l->last = -1;
63   l->first_free = 0;
64   l->flag = 0;
65 }
66
67
68
69 /** Expand the callback list to a max length of ml */
70 static void expand_cblist(ccd_cblist *l, unsigned int ml)
71 {
72   ccd_cblist_elem *old_elems = l->elems;
73   int i = 0;
74   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
75   _MEMCHECK(l->elems);
76   for(i=0;i<(l->len);i++)
77     l->elems[i] = old_elems[i];
78   free(old_elems);
79   for(i=l->len;i<ml;i++) {
80     l->elems[i].next = i+1;
81     l->elems[i].prev = i-1;
82   }
83   l->elems[ml-1].next = -1;
84   l->elems[l->len].prev = -1;
85   l->maxlen = ml;
86   l->first_free = l->len;
87 }
88
89
90
91 /** Remove element referred to by given list index idx. */
92 static void remove_elem(ccd_cblist *l, int idx)
93 {
94   ccd_cblist_elem *e = l->elems;
95   /* remove lidx from the busy list */
96   if(e[idx].next != (-1))
97     e[e[idx].next].prev = e[idx].prev;
98   if(e[idx].prev != (-1))
99     e[e[idx].prev].next = e[idx].next;
100   if(idx==(l->first)) 
101     l->first = e[idx].next;
102   if(idx==(l->last)) 
103     l->last = e[idx].prev;
104   /* put lidx in the free list */
105   e[idx].prev = -1;
106   e[idx].next = l->first_free;
107   if(e[idx].next != (-1))
108     e[e[idx].next].prev = idx;
109   l->first_free = idx;
110   l->len--;
111 }
112
113
114
115 /** Remove n elements from the beginning of the list. */
116 static void remove_n_elems(ccd_cblist *l, int n)
117 {
118   int i;
119   if(n==0 || (l->len < n))
120     return;
121   for(i=0;i<n;i++) {
122     remove_elem(l, l->first);
123   }
124 }
125
126
127
128 /** Append callback to the given cblist, and return the index. */
129 static int append_elem(ccd_cblist *l, CcdVoidFn fn, void *arg, int pe)
130 {
131   register int idx;
132   register ccd_cblist_elem *e;
133   if(l->len == l->maxlen)
134     expand_cblist(l, l->maxlen*2);
135   idx = l->first_free;
136   e = l->elems;
137   l->first_free = e[idx].next;
138   e[idx].next = -1;
139   e[idx].prev = l->last;
140   if(l->first == (-1))
141     l->first = idx;
142   if(l->last != (-1))
143     e[l->last].next = idx;
144   l->last = idx;
145   e[idx].cb.fn = fn;
146   e[idx].cb.arg = arg;
147   e[idx].cb.pe = pe;
148   l->len++;
149   return idx;
150 }
151
152
153
154 /**
155  * Trigger the callbacks in the provided callback list and *retain* them
156  * after they are called. 
157  *
158  * Callbacks that are added after this function is started (e.g. callbacks 
159  * registered from other callbacks) are ignored. 
160  * @note: It is illegal to cancel callbacks from within ccd callbacks.
161  */
162 static void call_cblist_keep(ccd_cblist *l,double curWallTime)
163 {
164   int i, len = l->len, idx;
165   for(i=0, idx=l->first;i<len;i++) {
166     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
167     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
168     CmiSwitchToPE(old);
169     idx = l->elems[idx].next;
170   }
171 }
172
173
174
175 /**
176  * Trigger the callbacks in the provided callback list and *remove* them
177  * from the list after they are called.
178  *
179  * Callbacks that are added after this function is started (e.g. callbacks 
180  * registered from other callbacks) are ignored. 
181  * @note: It is illegal to cancel callbacks from within ccd callbacks.
182  */
183 static void call_cblist_remove(ccd_cblist *l,double curWallTime)
184 {
185   int i, len = l->len, idx;
186   /* reentrant */
187   if (l->flag) return;
188   l->flag = 1;
189 #if ! CMK_BIGSIM_CHARM
190   for(i=0, idx=l->first;i<len;i++) {
191     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
192     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
193     CmiSwitchToPE(old);
194     idx = l->elems[idx].next;
195   }
196 #else
197   for(i=0, idx=l->last;i<len;i++) {
198     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
199     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
200     CmiSwitchToPE(old);
201     idx = l->elems[idx].prev;
202   }
203 #endif
204   remove_n_elems(l,len);
205   l->flag = 0;
206 }
207
208
209
210 #define CBLIST_INIT_LEN   8
211 #define MAXNUMCONDS       512
212
213 /**
214  * Lists of conditional callbacks that are maintained by the scheduler
215  */
216 typedef struct {
217   ccd_cblist condcb[MAXNUMCONDS];
218   ccd_cblist condcb_keep[MAXNUMCONDS];
219 } ccd_cond_callbacks;
220
221 /***/
222 CpvStaticDeclare(ccd_cond_callbacks, conds);   
223
224
225
226 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
227 #define CCD_PERIODIC_MAX 11
228 const static double periodicCallInterval[CCD_PERIODIC_MAX]=
229 {0.001, 0.010, 0.100, 1.0, 10.0, 60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
230
231 /**
232  * List of periodic callbacks maintained by the scheduler
233  */
234 typedef struct {
235         int nSkip;/*Number of opportunities to skip*/
236         double lastCheck;/*Time of last check*/
237         double nextCall[CCD_PERIODIC_MAX];
238 } ccd_periodic_callbacks;
239
240 /** */
241 CpvStaticDeclare(ccd_periodic_callbacks, pcb);
242 CpvDeclare(int, _ccd_numchecks);
243
244
245
246 #define MAXTIMERHEAPENTRIES       256
247
248 /**
249  * Structure used to manage callbacks in a heap
250  */
251 typedef struct {
252     double time;
253     ccd_callback cb;
254 } ccd_heap_elem;
255
256
257 /* Note : The heap is only stored in elements ccd_heap[0] to 
258  * ccd_heap[ccd_heaplen]
259  */
260
261 /** An array of time-scheduled callbacks managed as a heap */
262 CpvStaticDeclare(ccd_heap_elem*, ccd_heap); 
263 /** The length of the callback heap */
264 CpvStaticDeclare(int, ccd_heaplen);
265 /** The max allowed length of the callback heap */
266 CpvStaticDeclare(int, ccd_heapmaxlen);
267
268
269
270 /** Swap two elements on the heap */
271 static void ccd_heap_swap(int index1, int index2)
272 {
273   ccd_heap_elem *h = CpvAccess(ccd_heap);
274   ccd_heap_elem temp;
275   
276   temp = h[index1];
277   h[index1] = h[index2];
278   h[index2] = temp;
279 }
280
281
282
283 /**
284  * Expand the ccd_heap to make more room.
285  *
286  * Double the heap size and copy everything over. Initial 256 is reasonably 
287  * big, so expanding won't happen often.
288  *
289  * Had a bug previously due to late expansion, should work now - Gengbin 12/4/03
290 */
291 static void expand_ccd_heap()
292 {
293   int i;
294   int oldlen = CpvAccess(ccd_heapmaxlen);
295   int newlen = oldlen*2;
296   ccd_heap_elem *newheap;
297
298   CmiPrintf("[%d] Warning: ccd_heap expand from %d to %d\n", CmiMyPe(),oldlen, newlen);
299
300   newheap = (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(newlen+1));
301   _MEMCHECK(newheap);
302   /* need to copy the second half part ??? */
303   for (i=0; i<=oldlen; i++) {
304     newheap[i] = CpvAccess(ccd_heap)[i];
305     newheap[i+newlen] = CpvAccess(ccd_heap)[i+oldlen];
306   }
307   free(CpvAccess(ccd_heap));
308   CpvAccess(ccd_heap) = newheap;
309   CpvAccess(ccd_heapmaxlen) = newlen;
310 }
311
312
313
314 /**
315  * Insert a new callback into the heap
316  */
317 static void ccd_heap_insert(double t, CcdVoidFn fnp, void *arg, int pe)
318 {
319   int child, parent;
320   ccd_heap_elem *h;
321   
322   if(CpvAccess(ccd_heaplen) >= CpvAccess(ccd_heapmaxlen)) {
323 /* CmiAbort("Heap overflow (InsertInHeap), exiting...\n"); */
324     expand_ccd_heap();
325   } 
326
327   h = CpvAccess(ccd_heap);
328
329   {
330     ccd_heap_elem *e = &(h[++CpvAccess(ccd_heaplen)]);
331     e->time = t;
332     e->cb.fn = fnp;
333     e->cb.arg = arg;
334     e->cb.pe = pe;
335     child  = CpvAccess(ccd_heaplen);    
336     parent = child / 2;
337     while((parent>0) && (h[child].time<h[parent].time)) {
338             ccd_heap_swap(child, parent);
339             child  = parent;
340             parent = parent / 2;
341     }
342   }
343 }
344
345
346
347 /**
348  * Remove the top of the heap
349  */
350 static void ccd_heap_remove(void)
351 {
352   int parent,child;
353   ccd_heap_elem *h = CpvAccess(ccd_heap);
354   
355   parent = 1;
356   if(CpvAccess(ccd_heaplen)>0) {
357     /* put deleted value at end of heap */
358     ccd_heap_swap(1,CpvAccess(ccd_heaplen)); 
359     CpvAccess(ccd_heaplen)--;
360     if(CpvAccess(ccd_heaplen)) {
361       /* if any left, then bubble up values */
362             child = 2 * parent;
363             while(child <= CpvAccess(ccd_heaplen)) {
364               if(((child + 1) <= CpvAccess(ccd_heaplen))  &&
365                        (h[child].time > h[child+1].time))
366                 child++; /* use the smaller of the two */
367               if(h[parent].time <= h[child].time) 
368                       break;
369               ccd_heap_swap(parent,child);
370               parent  = child;      /* go down the tree one more step */
371               child  = 2 * child;
372       }
373     }
374   } 
375 }
376
377
378
379 /**
380  * Identify any (over)due callbacks that were scheduled
381  * and trigger them. 
382  */
383 static void ccd_heap_update(double curWallTime)
384 {
385   ccd_heap_elem *h = CpvAccess(ccd_heap);
386   ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
387   int i,ne=0;
388   /* Pull out all expired heap entries */
389   while ((CpvAccess(ccd_heaplen)>0) && (h[1].time<curWallTime)) {
390     e[ne++]=h[1];
391     ccd_heap_remove();
392   }
393   /* Now execute those heap entries.  This must be
394      separated from the removal phase because executing
395      an entry may change the heap. 
396   */
397   for (i=0;i<ne;i++) {
398 /*
399       ccd_heap_elem *h = CpvAccess(ccd_heap);
400       ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
401 */
402       int old = CmiSwitchToPE(e[i].cb.pe);
403       (*(e[i].cb.fn))(e[i].cb.arg,curWallTime);
404       CmiSwitchToPE(old);
405   }
406 }
407
408
409
410 void CcdCallBacksReset(void *ignored,double curWallTime);
411
412 /**
413  * Initialize the callback containers
414  */
415 void CcdModuleInit(void)
416 {
417    int i;
418    double curTime;
419    CpvInitialize(ccd_heap_elem*, ccd_heap);
420    CpvInitialize(ccd_cond_callbacks, conds);
421    CpvInitialize(ccd_periodic_callbacks, pcb);
422    CpvInitialize(int, ccd_heaplen);
423    CpvInitialize(int, ccd_heapmaxlen);
424    CpvInitialize(int, _ccd_numchecks);
425
426    CpvAccess(ccd_heaplen) = 0;
427    CpvAccess(ccd_heapmaxlen) = MAXTIMERHEAPENTRIES;
428    CpvAccess(ccd_heap) = 
429      (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(MAXTIMERHEAPENTRIES + 1));
430    _MEMCHECK(CpvAccess(ccd_heap));
431    for(i=0;i<MAXNUMCONDS;i++) {
432      init_cblist(&(CpvAccess(conds).condcb[i]), CBLIST_INIT_LEN);
433      init_cblist(&(CpvAccess(conds).condcb_keep[i]), CBLIST_INIT_LEN);
434    }
435    CpvAccess(_ccd_numchecks) = 1;
436    CpvAccess(pcb).nSkip = 1;
437    curTime=CmiWallTimer();
438    CpvAccess(pcb).lastCheck = curTime;
439    for (i=0;i<CCD_PERIODIC_MAX;i++)
440            CpvAccess(pcb).nextCall[i]=curTime+periodicCallInterval[i];
441    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,CcdCallBacksReset,0);
442    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,CcdCallBacksReset,0);
443 }
444
445
446
447 /**
448  * Register a callback function that will be triggered when the specified
449  * condition is raised the next time
450  */
451 int CcdCallOnCondition(int condnum, CcdVoidFn fnp, void *arg)
452 {
453   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, CcdIGNOREPE);
454
455
456 /** 
457  * Register a callback function that will be triggered on the specified PE
458  * when the specified condition is raised the next time 
459  */
460 int CcdCallOnConditionOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
461 {
462   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, pe);
463
464
465 /**
466  * Register a callback function that will be triggered *whenever* the specified
467  * condition is raised
468  */
469 int CcdCallOnConditionKeep(int condnum, CcdVoidFn fnp, void *arg)
470 {
471   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, CcdIGNOREPE);
472
473
474 /**
475  * Register a callback function that will be triggered on the specified PE
476  * *whenever* the specified condition is raised
477  */
478 int CcdCallOnConditionKeepOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
479 {
480   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, pe);
481
482
483
484 /**
485  * Cancel a previously registered conditional callback
486  */
487 void CcdCancelCallOnCondition(int condnum, int idx)
488 {
489   remove_elem(&(CpvAccess(conds).condcb[condnum]), idx);
490 }
491
492
493 /**
494  * Cancel a previously registered conditional callback
495  */
496 void CcdCancelCallOnConditionKeep(int condnum, int idx)
497 {
498   remove_elem(&(CpvAccess(conds).condcb_keep[condnum]), idx);
499 }
500
501
502 /**
503  * Register a callback function that will be triggered on the specified PE
504  * after a minimum delay of deltaT
505  */
506 void CcdCallFnAfterOnPE(CcdVoidFn fnp, void *arg, double deltaT, int pe)
507 {
508     double ctime  = CmiWallTimer();
509     double tcall = ctime + deltaT/1000.0;
510     ccd_heap_insert(tcall, fnp, arg, pe);
511
512
513
514 /**
515  * Register a callback function that will be triggered after a minimum 
516  * delay of deltaT
517  */
518 void CcdCallFnAfter(CcdVoidFn fnp, void *arg, double deltaT)
519 {
520     CcdCallFnAfterOnPE(fnp, arg, deltaT, CcdIGNOREPE);
521
522
523
524 /**
525  * Raise a condition causing all registered callbacks corresponding to 
526  * that condition to be triggered
527  */
528 void CcdRaiseCondition(int condnum)
529 {
530   double curWallTime=CmiWallTimer();
531   call_cblist_remove(&(CpvAccess(conds).condcb[condnum]),curWallTime);
532   call_cblist_keep(&(CpvAccess(conds).condcb_keep[condnum]),curWallTime);
533 }
534
535
536 /* 
537  * Trigger callbacks periodically, and also the time-indexed
538  * functions if their time has arrived
539  */
540 void CcdCallBacks(void)
541 {
542   int i;
543   ccd_periodic_callbacks *o=&CpvAccess(pcb);
544   
545   /* Figure out how many times to skip Ccd processing */
546   double curWallTime = CmiWallTimer();
547
548   unsigned int nSkip=o->nSkip;
549 #if 1
550 /* Dynamically adjust the number of messages to skip */
551   double elapsed = curWallTime - o->lastCheck;
552 #define targetElapsed 5.0e-3
553   if (elapsed<targetElapsed) nSkip*=2; /* too short: process more */
554   else /* elapsed>targetElapsed */ nSkip/=2; /* too long: process fewer */
555   
556 /* Keep skipping within a sensible range */
557 #define minSkip 1u
558 #define maxSkip 20u
559   if (nSkip<minSkip) nSkip=minSkip;
560   else if (nSkip>maxSkip) nSkip=maxSkip;
561 #else
562 /* Always skip a fixed number of messages */
563   nSkip=1;
564 #endif
565
566   CpvAccess(_ccd_numchecks)=o->nSkip=nSkip;
567   o->lastCheck=curWallTime;
568   
569   ccd_heap_update(curWallTime);
570   
571   for (i=0;i<CCD_PERIODIC_MAX;i++) 
572     if (o->nextCall[i]<=curWallTime) {
573       CcdRaiseCondition(CcdPERIODIC+i);
574       o->nextCall[i]=curWallTime+periodicCallInterval[i];
575     }
576     else 
577       break; /*<- because intervals are multiples of one another*/
578
579
580
581
582 /**
583  * Called when something drastic changes-- restart ccd_num_checks
584  */
585 void CcdCallBacksReset(void *ignored,double curWallTime)
586 {
587   ccd_periodic_callbacks *o=&CpvAccess(pcb);
588   CpvAccess(_ccd_numchecks)=o->nSkip=1;
589   o->lastCheck=curWallTime;
590 }
591
592