Ccd callbacks now take the current wall clock time as their
[charm.git] / src / conv-core / conv-conds.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <stdio.h>
9 #include <stdlib.h>
10
11 #include "converse.h"
12
13 typedef struct _ccd_callback {
14   CcdVoidFn fn;
15   void *arg;
16   int pe;                       /* the pe that sets the callback */
17 } ccd_callback;
18
19 typedef struct _ccd_cblist_elem {
20   ccd_callback cb;
21   int next;
22   int prev;
23 } ccd_cblist_elem;
24
25 typedef struct _ccd_cblist {
26   unsigned int maxlen;
27   unsigned int len;
28   int first, last;
29   int first_free;
30   ccd_cblist_elem *elems;
31 } ccd_cblist;
32
33 /* initializes a callback list to the maximum length of ml.
34  */
35 static void init_cblist(ccd_cblist *l, unsigned int ml)
36 {
37   int i;
38   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
39   _MEMCHECK(l->elems);
40   for(i=0;i<ml;i++) {
41     l->elems[i].next = i+1;
42     l->elems[i].prev = i-1;
43   }
44   l->elems[ml-1].next = -1;
45   l->len = 0;
46   l->maxlen = ml;
47   l->first = l->last = -1;
48   l->first_free = 0;
49 }
50
51 /* expand the callback list to a max length of ml
52  */
53 static void expand_cblist(ccd_cblist *l, unsigned int ml)
54 {
55   ccd_cblist_elem *old_elems = l->elems;
56   int i = 0;
57   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
58   _MEMCHECK(l->elems);
59   for(i=0;i<(l->len);i++)
60     l->elems[i] = old_elems[i];
61   free(old_elems);
62   for(i=l->len;i<ml;i++) {
63     l->elems[i].next = i+1;
64     l->elems[i].prev = i-1;
65   }
66   l->elems[ml-1].next = -1;
67   l->elems[l->len].prev = -1;
68   l->maxlen = ml;
69   l->first_free = l->len;
70 }
71
72 /* remove element referred to by given list index idx.
73  */
74 static void remove_elem(ccd_cblist *l, int idx)
75 {
76   ccd_cblist_elem *e = l->elems;
77   /* remove lidx from the busy list */
78   if(e[idx].next != (-1))
79     e[e[idx].next].prev = e[idx].prev;
80   if(e[idx].prev != (-1))
81     e[e[idx].prev].next = e[idx].next;
82   if(idx==(l->first)) 
83     l->first = e[idx].next;
84   if(idx==(l->last)) 
85     l->last = e[idx].prev;
86   /* put lidx in the free list */
87   e[idx].prev = -1;
88   e[idx].next = l->first_free;
89   if(e[idx].next != (-1))
90     e[e[idx].next].prev = idx;
91   l->first_free = idx;
92   l->len--;
93 }
94
95 /* remove n elements from the beginning of the list.
96  */
97 static void remove_n_elems(ccd_cblist *l, int n)
98 {
99   int i;
100   if(n==0 || (l->len < n))
101     return;
102   for(i=0;i<n;i++) {
103     remove_elem(l, l->first);
104   }
105 }
106
107 /* append callback to the given cblist, and return the index.
108  */
109 static int append_elem(ccd_cblist *l, CcdVoidFn fn, void *arg, int pe)
110 {
111   register int idx;
112   register ccd_cblist_elem *e;
113   if(l->len == l->maxlen)
114     expand_cblist(l, l->maxlen*2);
115   idx = l->first_free;
116   e = l->elems;
117   l->first_free = e[idx].next;
118   e[idx].next = -1;
119   e[idx].prev = l->last;
120   if(l->first == (-1))
121     l->first = idx;
122   if(l->last != (-1))
123     e[l->last].next = idx;
124   l->last = idx;
125   e[idx].cb.fn = fn;
126   e[idx].cb.arg = arg;
127   e[idx].cb.pe = pe;
128   l->len++;
129   return idx;
130 }
131
132 /* call functions on the cblist. functions that are added after the call 
133  * cblist is started (e.g. callbacks registered from other callbacks) are 
134  * ignored. callbacks are kept in the list even after they are called.
135  * Note: it is illegal to cancel callbacks from within ccd callbacks.
136  */
137 static void call_cblist_keep(ccd_cblist *l,double curWallTime)
138 {
139   int i, len = l->len, idx;
140   for(i=0, idx=l->first;i<len;i++) {
141     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
142     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
143     CmiSwitchToPE(old);
144     idx = l->elems[idx].next;
145   }
146 }
147
148 /* call functions on the cblist. functions that are added after the call 
149  * cblist is started (e.g. callbacks registered from other callbacks) are 
150  * ignored. callbacks are removed from the list after they are called.
151  * Note: it is illegal to cancel callbacks from within ccd callbacks.
152  */
153 static void call_cblist_remove(ccd_cblist *l,double curWallTime)
154 {
155   int i, len = l->len, idx;
156 #if ! CMK_BLUEGENE_CHARM
157   for(i=0, idx=l->first;i<len;i++) {
158     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
159     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
160     CmiSwitchToPE(old);
161     idx = l->elems[idx].next;
162   }
163 #else
164   for(i=0, idx=l->last;i<len;i++) {
165     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
166     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
167     CmiSwitchToPE(old);
168     idx = l->elems[idx].prev;
169   }
170 #endif
171   remove_n_elems(l,len);
172 }
173
174 #define CBLIST_INIT_LEN   8
175 #define MAXNUMCONDS       512
176
177 typedef struct {
178   ccd_cblist condcb[MAXNUMCONDS];
179   ccd_cblist condcb_keep[MAXNUMCONDS];
180 } ccd_cond_callbacks;
181
182 CpvStaticDeclare(ccd_cond_callbacks, conds);   
183
184 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
185 #define CCD_PERIODIC_MAX 10
186 const static double periodicCallInterval[CCD_PERIODIC_MAX]=
187 {0.001, 0.010, 0.100, 1.0, 10.0, 60.0,10*60.0, 3600.0, 12*3600.0, 24*3600.0};
188
189 typedef struct {
190         int nSkip;/*Number of opportunities to skip*/
191         double lastCheck;/*Time of last check*/
192         double nextCall[CCD_PERIODIC_MAX];
193 } ccd_periodic_callbacks;
194
195 CpvStaticDeclare(ccd_periodic_callbacks, pcb);
196 CpvDeclare(int, _ccd_numchecks);
197
198 #define MAXTIMERHEAPENTRIES       256
199
200 typedef struct {
201     double time;
202     ccd_callback cb;
203 } ccd_heap_elem;
204
205
206 /* Note : The heap is only stored in elements ccd_heap[0] to 
207  * ccd_heap[ccd_heaplen]
208  */
209
210 CpvStaticDeclare(ccd_heap_elem*, ccd_heap); 
211 CpvStaticDeclare(int, ccd_heaplen);
212 CpvStaticDeclare(int, ccd_heapmaxlen);
213
214 static void ccd_heap_swap(int index1, int index2)
215 {
216   ccd_heap_elem *h = CpvAccess(ccd_heap);
217   ccd_heap_elem temp;
218   
219   temp = h[index1];
220   h[index1] = h[index2];
221   h[index2] = temp;
222 }
223
224 /*
225  expand the ccd_heap, double the heap size and copy everything over.
226  Initial 256 is reasonably big, so expanding won't happen often.
227  Had a bug previously due to late expansion, should work now - Gengbin 12/4/03
228 */
229 static void expand_ccd_heap()
230 {
231   int i;
232   int oldlen = CpvAccess(ccd_heapmaxlen);
233   int newlen = oldlen*2;
234   ccd_heap_elem *newheap;
235
236 CmiPrintf("[%d] Warning: ccd_heap expand from %d to %d\n", CmiMyPe(),oldlen, newlen);
237
238   newheap = (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(newlen+1));
239   _MEMCHECK(newheap);
240   /* need to copy the second half part ??? */
241   for (i=0; i<=oldlen; i++) {
242     newheap[i] = CpvAccess(ccd_heap)[i];
243     newheap[i+newlen] = CpvAccess(ccd_heap)[i+oldlen];
244   }
245   free(CpvAccess(ccd_heap));
246   CpvAccess(ccd_heap) = newheap;
247   CpvAccess(ccd_heapmaxlen) = newlen;
248 }
249
250 static void ccd_heap_insert(double t, CcdVoidFn fnp, void *arg, int pe)
251 {
252   int child, parent;
253   ccd_heap_elem *h;
254   
255   if(CpvAccess(ccd_heaplen) >= CpvAccess(ccd_heapmaxlen)) {
256 /* CmiAbort("Heap overflow (InsertInHeap), exiting...\n"); */
257     expand_ccd_heap();
258   } 
259
260   h = CpvAccess(ccd_heap);
261
262   {
263     ccd_heap_elem *e = &(h[++CpvAccess(ccd_heaplen)]);
264     e->time = t;
265     e->cb.fn = fnp;
266     e->cb.arg = arg;
267     e->cb.pe = pe;
268     child  = CpvAccess(ccd_heaplen);    
269     parent = child / 2;
270     while((parent>0) && (h[child].time<h[parent].time)) {
271             ccd_heap_swap(child, parent);
272             child  = parent;
273             parent = parent / 2;
274     }
275   }
276 }
277
278 /* remove the top of the heap
279  */
280 static void ccd_heap_remove(void)
281 {
282   int parent,child;
283   ccd_heap_elem *h = CpvAccess(ccd_heap);
284   
285   parent = 1;
286   if(CpvAccess(ccd_heaplen)>0) {
287     /* put deleted value at end of heap */
288     ccd_heap_swap(1,CpvAccess(ccd_heaplen)); 
289     CpvAccess(ccd_heaplen)--;
290     if(CpvAccess(ccd_heaplen)) {
291       /* if any left, then bubble up values */
292             child = 2 * parent;
293             while(child <= CpvAccess(ccd_heaplen)) {
294               if(((child + 1) <= CpvAccess(ccd_heaplen))  &&
295                        (h[child].time > h[child+1].time))
296                 child++; /* use the smaller of the two */
297               if(h[parent].time <= h[child].time) 
298                       break;
299               ccd_heap_swap(parent,child);
300               parent  = child;      /* go down the tree one more step */
301               child  = 2 * child;
302       }
303     }
304   } 
305 }
306
307 /* If any of the CallFnAfter functions can now be called, call them 
308  */
309 static void ccd_heap_update(double curWallTime)
310 {
311   ccd_heap_elem *h = CpvAccess(ccd_heap);
312   ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
313   int i,ne=0;
314   /* Pull out all expired heap entries */
315   while ((CpvAccess(ccd_heaplen)>0) && (h[1].time<curWallTime)) {
316     e[ne++]=h[1];
317     ccd_heap_remove();
318   }
319   /* Now execute those heap entries.  This must be
320      separated from the removal phase because executing
321      an entry may change the heap. 
322   */
323   for (i=0;i<ne;i++) {
324 /*
325       ccd_heap_elem *h = CpvAccess(ccd_heap);
326       ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
327 */
328       int old = CmiSwitchToPE(e[i].cb.pe);
329       (*(e[i].cb.fn))(e[i].cb.arg,curWallTime);
330       CmiSwitchToPE(old);
331   }
332 }
333
334 void CcdCallBacksReset(void *ignored,double curWallTime);
335
336 void CcdModuleInit(void)
337 {
338    int i;
339    double curTime;
340    CpvInitialize(ccd_heap_elem*, ccd_heap);
341    CpvInitialize(ccd_cond_callbacks, conds);
342    CpvInitialize(ccd_periodic_callbacks, pcb);
343    CpvInitialize(int, ccd_heaplen);
344    CpvInitialize(int, ccd_heapmaxlen);
345    CpvInitialize(int, _ccd_numchecks);
346
347    CpvAccess(ccd_heaplen) = 0;
348    CpvAccess(ccd_heapmaxlen) = MAXTIMERHEAPENTRIES;
349    CpvAccess(ccd_heap) = 
350      (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(MAXTIMERHEAPENTRIES + 1));
351    _MEMCHECK(CpvAccess(ccd_heap));
352    for(i=0;i<MAXNUMCONDS;i++) {
353      init_cblist(&(CpvAccess(conds).condcb[i]), CBLIST_INIT_LEN);
354      init_cblist(&(CpvAccess(conds).condcb_keep[i]), CBLIST_INIT_LEN);
355    }
356    CpvAccess(_ccd_numchecks) = 1;
357    CpvAccess(pcb).nSkip = 1;
358    curTime=CmiWallTimer();
359    CpvAccess(pcb).lastCheck = curTime;
360    for (i=0;i<CCD_PERIODIC_MAX;i++)
361            CpvAccess(pcb).nextCall[i]=curTime+periodicCallInterval[i];
362    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,CcdCallBacksReset,0);
363    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,CcdCallBacksReset,0);
364 }
365
366
367
368 /* Add a function that will be called when a particular condition is raised
369  */
370 int CcdCallOnCondition(int condnum, CcdVoidFn fnp, void *arg)
371 {
372   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, CcdIGNOREPE);
373
374
375 int CcdCallOnConditionOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
376 {
377   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, pe);
378
379
380 int CcdCallOnConditionKeep(int condnum, CcdVoidFn fnp, void *arg)
381 {
382   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, CcdIGNOREPE);
383
384
385 int CcdCallOnConditionKeepOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
386 {
387   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, pe);
388
389
390 void CcdCancelCallOnCondition(int condnum, int idx)
391 {
392   remove_elem(&(CpvAccess(conds).condcb[condnum]), idx);
393 }
394
395 void CcdCancelCallOnConditionKeep(int condnum, int idx)
396 {
397   remove_elem(&(CpvAccess(conds).condcb_keep[condnum]), idx);
398 }
399
400 /* Call the function with the provided argument after a minimum delay of deltaT
401  */
402 void CcdCallFnAfterOnPE(CcdVoidFn fnp, void *arg, unsigned int deltaT, int pe)
403 {
404   double ctime  = CmiWallTimer();
405   double tcall = ctime + (double)deltaT/1000.0;
406   ccd_heap_insert(tcall, fnp, arg, pe);
407
408
409 void CcdCallFnAfter(CcdVoidFn fnp, void *arg, unsigned int deltaT)
410 {
411   CcdCallFnAfterOnPE(fnp, arg, deltaT, CcdIGNOREPE);
412
413
414 /* Call all the functions that are waiting for this condition to be raised
415  */
416 void CcdRaiseCondition(int condnum)
417 {
418   double curWallTime=CmiWallTimer();
419   call_cblist_remove(&(CpvAccess(conds).condcb[condnum]),curWallTime);
420   call_cblist_keep(&(CpvAccess(conds).condcb_keep[condnum]),curWallTime);
421 }
422
423 /* call functions to be called periodically, and also the time-indexed
424  * functions if their time has arrived
425  */
426 void CcdCallBacks(void)
427 {
428   int i;
429   ccd_periodic_callbacks *o=&CpvAccess(pcb);
430   
431   /* Figure out how many times to skip Ccd processing */
432   double curWallTime = CmiWallTimer();
433
434   unsigned int nSkip=o->nSkip;
435 #if 1
436 /* Dynamically adjust the number of messages to skip */
437   double elapsed = curWallTime - o->lastCheck;
438 #define targetElapsed 5.0e-3
439   if (elapsed<targetElapsed) nSkip*=2; /* too short: process more */
440   else /* elapsed>targetElapsed */ nSkip/=2; /* too long: process fewer */
441   
442 /* Keep skipping within a sensible range */
443 #define minSkip 1u
444 #define maxSkip 20u
445   if (nSkip<minSkip) nSkip=minSkip;
446   else if (nSkip>maxSkip) nSkip=maxSkip;
447 #else
448 /* Always skip a fixed number of messages */
449   nSkip=1;
450 #endif
451
452   CpvAccess(_ccd_numchecks)=o->nSkip=nSkip;
453   o->lastCheck=curWallTime;
454   
455   ccd_heap_update(curWallTime);
456   
457   for (i=0;i<CCD_PERIODIC_MAX;i++) 
458     if (o->nextCall[i]<=curWallTime) {
459       CcdRaiseCondition(CcdPERIODIC+i);
460       o->nextCall[i]=curWallTime+periodicCallInterval[i];
461     }
462     else 
463       break; /*<- because intervals are multiples of one another*/
464
465
466 /*Called when something drastic changes-- restart ccd_num_checks
467 */
468 void CcdCallBacksReset(void *ignored,double curWallTime)
469 {
470   ccd_periodic_callbacks *o=&CpvAccess(pcb);
471   CpvAccess(_ccd_numchecks)=o->nSkip=1;
472   o->lastCheck=curWallTime;
473 }
474
475