doc: Add serial to list of ci file reserved words
[charm.git] / src / conv-core / conv-conds.c
1 #include <stdio.h>
2 #include <stdlib.h>
3
4 #include "converse.h"
5
6 /**
7  * Structure to hold the requisites for a callback
8  */
9 typedef struct _ccd_callback {
10   CcdVoidFn fn;
11   void *arg;
12   int pe;                       /* the pe that sets the callback */
13 } ccd_callback;
14
15
16
17 /**
18  * An element (a single callback) in a list of callbacks
19  */
20 typedef struct _ccd_cblist_elem {
21   ccd_callback cb;
22   int next;
23   int prev;
24 } ccd_cblist_elem;
25
26
27
28 /**
29  * A list of callbacks stored as an array and handled like a list
30  */
31 typedef struct _ccd_cblist {
32   unsigned int maxlen;
33   unsigned int len;
34   int first, last;
35   int first_free;
36   ccd_cblist_elem *elems;
37   int flag;
38 } ccd_cblist;
39
40
41
42 /** Initialize a list of callbacks. Alloc memory, set counters etc. */
43 static void init_cblist(ccd_cblist *l, unsigned int ml)
44 {
45   int i;
46   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
47   _MEMCHECK(l->elems);
48   for(i=0;i<ml;i++) {
49     l->elems[i].next = i+1;
50     l->elems[i].prev = i-1;
51   }
52   l->elems[ml-1].next = -1;
53   l->len = 0;
54   l->maxlen = ml;
55   l->first = l->last = -1;
56   l->first_free = 0;
57   l->flag = 0;
58 }
59
60
61
62 /** Expand the callback list to a max length of ml */
63 static void expand_cblist(ccd_cblist *l, unsigned int ml)
64 {
65   ccd_cblist_elem *old_elems = l->elems;
66   int i = 0;
67   l->elems = (ccd_cblist_elem*) malloc(ml*sizeof(ccd_cblist_elem));
68   _MEMCHECK(l->elems);
69   for(i=0;i<(l->len);i++)
70     l->elems[i] = old_elems[i];
71   free(old_elems);
72   for(i=l->len;i<ml;i++) {
73     l->elems[i].next = i+1;
74     l->elems[i].prev = i-1;
75   }
76   l->elems[ml-1].next = -1;
77   l->elems[l->len].prev = -1;
78   l->maxlen = ml;
79   l->first_free = l->len;
80 }
81
82
83
84 /** Remove element referred to by given list index idx. */
85 static void remove_elem(ccd_cblist *l, int idx)
86 {
87   ccd_cblist_elem *e = l->elems;
88   /* remove lidx from the busy list */
89   if(e[idx].next != (-1))
90     e[e[idx].next].prev = e[idx].prev;
91   if(e[idx].prev != (-1))
92     e[e[idx].prev].next = e[idx].next;
93   if(idx==(l->first)) 
94     l->first = e[idx].next;
95   if(idx==(l->last)) 
96     l->last = e[idx].prev;
97   /* put lidx in the free list */
98   e[idx].prev = -1;
99   e[idx].next = l->first_free;
100   if(e[idx].next != (-1))
101     e[e[idx].next].prev = idx;
102   l->first_free = idx;
103   l->len--;
104 }
105
106
107
108 /** Remove n elements from the beginning of the list. */
109 static void remove_n_elems(ccd_cblist *l, int n)
110 {
111   int i;
112   if(n==0 || (l->len < n))
113     return;
114   for(i=0;i<n;i++) {
115     remove_elem(l, l->first);
116   }
117 }
118
119
120
121 /** Append callback to the given cblist, and return the index. */
122 static int append_elem(ccd_cblist *l, CcdVoidFn fn, void *arg, int pe)
123 {
124   register int idx;
125   register ccd_cblist_elem *e;
126   if(l->len == l->maxlen)
127     expand_cblist(l, l->maxlen*2);
128   idx = l->first_free;
129   e = l->elems;
130   l->first_free = e[idx].next;
131   e[idx].next = -1;
132   e[idx].prev = l->last;
133   if(l->first == (-1))
134     l->first = idx;
135   if(l->last != (-1))
136     e[l->last].next = idx;
137   l->last = idx;
138   e[idx].cb.fn = fn;
139   e[idx].cb.arg = arg;
140   e[idx].cb.pe = pe;
141   l->len++;
142   return idx;
143 }
144
145
146
147 /**
148  * Trigger the callbacks in the provided callback list and *retain* them
149  * after they are called. 
150  *
151  * Callbacks that are added after this function is started (e.g. callbacks 
152  * registered from other callbacks) are ignored. 
153  * @note: It is illegal to cancel callbacks from within ccd callbacks.
154  */
155 static void call_cblist_keep(ccd_cblist *l,double curWallTime)
156 {
157   int i, len = l->len, idx;
158   for(i=0, idx=l->first;i<len;i++) {
159     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
160     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
161     CmiSwitchToPE(old);
162     idx = l->elems[idx].next;
163   }
164 }
165
166
167
168 /**
169  * Trigger the callbacks in the provided callback list and *remove* them
170  * from the list after they are called.
171  *
172  * Callbacks that are added after this function is started (e.g. callbacks 
173  * registered from other callbacks) are ignored. 
174  * @note: It is illegal to cancel callbacks from within ccd callbacks.
175  */
176 static void call_cblist_remove(ccd_cblist *l,double curWallTime)
177 {
178   int i, len = l->len, idx;
179   /* reentrant */
180   if (l->flag) return;
181   l->flag = 1;
182 #if ! CMK_BIGSIM_CHARM
183   for(i=0, idx=l->first;i<len;i++) {
184     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
185     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
186     CmiSwitchToPE(old);
187     idx = l->elems[idx].next;
188   }
189 #else
190   for(i=0, idx=l->last;i<len;i++) {
191     int old = CmiSwitchToPE(l->elems[idx].cb.pe);
192     (*(l->elems[idx].cb.fn))(l->elems[idx].cb.arg,curWallTime);
193     CmiSwitchToPE(old);
194     idx = l->elems[idx].prev;
195   }
196 #endif
197   remove_n_elems(l,len);
198   l->flag = 0;
199 }
200
201
202
203 #define CBLIST_INIT_LEN   8
204 #define MAXNUMCONDS       512
205
206 /**
207  * Lists of conditional callbacks that are maintained by the scheduler
208  */
209 typedef struct {
210   ccd_cblist condcb[MAXNUMCONDS];
211   ccd_cblist condcb_keep[MAXNUMCONDS];
212 } ccd_cond_callbacks;
213
214 /***/
215 CpvStaticDeclare(ccd_cond_callbacks, conds);   
216
217
218
219 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
220 #define CCD_PERIODIC_MAX 13
221 const static double periodicCallInterval[CCD_PERIODIC_MAX]=
222 {0.001, 0.010, 0.100, 1.0, 5.0, 10.0, 60.0, 2*60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
223
224 /**
225  * List of periodic callbacks maintained by the scheduler
226  */
227 typedef struct {
228         int nSkip;/*Number of opportunities to skip*/
229         double lastCheck;/*Time of last check*/
230         double nextCall[CCD_PERIODIC_MAX];
231 } ccd_periodic_callbacks;
232
233 /** */
234 CpvStaticDeclare(ccd_periodic_callbacks, pcb);
235 CpvDeclare(int, _ccd_numchecks);
236
237
238
239 #define MAXTIMERHEAPENTRIES       256
240
241 /**
242  * Structure used to manage callbacks in a heap
243  */
244 typedef struct {
245     double time;
246     ccd_callback cb;
247 } ccd_heap_elem;
248
249
250 /* Note : The heap is only stored in elements ccd_heap[0] to 
251  * ccd_heap[ccd_heaplen]
252  */
253
254 /** An array of time-scheduled callbacks managed as a heap */
255 CpvStaticDeclare(ccd_heap_elem*, ccd_heap); 
256 /** The length of the callback heap */
257 CpvStaticDeclare(int, ccd_heaplen);
258 /** The max allowed length of the callback heap */
259 CpvStaticDeclare(int, ccd_heapmaxlen);
260
261
262
263 /** Swap two elements on the heap */
264 static void ccd_heap_swap(int index1, int index2)
265 {
266   ccd_heap_elem *h = CpvAccess(ccd_heap);
267   ccd_heap_elem temp;
268   
269   temp = h[index1];
270   h[index1] = h[index2];
271   h[index2] = temp;
272 }
273
274
275
276 /**
277  * Expand the ccd_heap to make more room.
278  *
279  * Double the heap size and copy everything over. Initial 256 is reasonably 
280  * big, so expanding won't happen often.
281  *
282  * Had a bug previously due to late expansion, should work now - Gengbin 12/4/03
283 */
284 static void expand_ccd_heap()
285 {
286   int i;
287   int oldlen = CpvAccess(ccd_heapmaxlen);
288   int newlen = oldlen*2;
289   ccd_heap_elem *newheap;
290
291   CmiPrintf("[%d] Warning: ccd_heap expand from %d to %d\n", CmiMyPe(),oldlen, newlen);
292
293   newheap = (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(newlen+1));
294   _MEMCHECK(newheap);
295   /* need to copy the second half part ??? */
296   for (i=0; i<=oldlen; i++) {
297     newheap[i] = CpvAccess(ccd_heap)[i];
298     newheap[i+newlen] = CpvAccess(ccd_heap)[i+oldlen];
299   }
300   free(CpvAccess(ccd_heap));
301   CpvAccess(ccd_heap) = newheap;
302   CpvAccess(ccd_heapmaxlen) = newlen;
303 }
304
305
306
307 /**
308  * Insert a new callback into the heap
309  */
310 static void ccd_heap_insert(double t, CcdVoidFn fnp, void *arg, int pe)
311 {
312   int child, parent;
313   ccd_heap_elem *h;
314   
315   if(CpvAccess(ccd_heaplen) >= CpvAccess(ccd_heapmaxlen)) {
316 /* CmiAbort("Heap overflow (InsertInHeap), exiting...\n"); */
317     expand_ccd_heap();
318   } 
319
320   h = CpvAccess(ccd_heap);
321
322   {
323     ccd_heap_elem *e = &(h[++CpvAccess(ccd_heaplen)]);
324     e->time = t;
325     e->cb.fn = fnp;
326     e->cb.arg = arg;
327     e->cb.pe = pe;
328     child  = CpvAccess(ccd_heaplen);    
329     parent = child / 2;
330     while((parent>0) && (h[child].time<h[parent].time)) {
331             ccd_heap_swap(child, parent);
332             child  = parent;
333             parent = parent / 2;
334     }
335   }
336 }
337
338
339
340 /**
341  * Remove the top of the heap
342  */
343 static void ccd_heap_remove(void)
344 {
345   int parent,child;
346   ccd_heap_elem *h = CpvAccess(ccd_heap);
347   
348   parent = 1;
349   if(CpvAccess(ccd_heaplen)>0) {
350     /* put deleted value at end of heap */
351     ccd_heap_swap(1,CpvAccess(ccd_heaplen)); 
352     CpvAccess(ccd_heaplen)--;
353     if(CpvAccess(ccd_heaplen)) {
354       /* if any left, then bubble up values */
355             child = 2 * parent;
356             while(child <= CpvAccess(ccd_heaplen)) {
357               if(((child + 1) <= CpvAccess(ccd_heaplen))  &&
358                        (h[child].time > h[child+1].time))
359                 child++; /* use the smaller of the two */
360               if(h[parent].time <= h[child].time) 
361                       break;
362               ccd_heap_swap(parent,child);
363               parent  = child;      /* go down the tree one more step */
364               child  = 2 * child;
365       }
366     }
367   } 
368 }
369
370
371
372 /**
373  * Identify any (over)due callbacks that were scheduled
374  * and trigger them. 
375  */
376 static void ccd_heap_update(double curWallTime)
377 {
378   ccd_heap_elem *h = CpvAccess(ccd_heap);
379   ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
380   int i,ne=0;
381   /* Pull out all expired heap entries */
382   while ((CpvAccess(ccd_heaplen)>0) && (h[1].time<curWallTime)) {
383     e[ne++]=h[1];
384     ccd_heap_remove();
385   }
386   /* Now execute those heap entries.  This must be
387      separated from the removal phase because executing
388      an entry may change the heap. 
389   */
390   for (i=0;i<ne;i++) {
391 /*
392       ccd_heap_elem *h = CpvAccess(ccd_heap);
393       ccd_heap_elem *e = h+CpvAccess(ccd_heapmaxlen);
394 */
395       int old = CmiSwitchToPE(e[i].cb.pe);
396       (*(e[i].cb.fn))(e[i].cb.arg,curWallTime);
397       CmiSwitchToPE(old);
398   }
399 }
400
401
402
403 void CcdCallBacksReset(void *ignored,double curWallTime);
404
405 /**
406  * Initialize the callback containers
407  */
408 void CcdModuleInit(void)
409 {
410    int i;
411    double curTime;
412    CpvInitialize(ccd_heap_elem*, ccd_heap);
413    CpvInitialize(ccd_cond_callbacks, conds);
414    CpvInitialize(ccd_periodic_callbacks, pcb);
415    CpvInitialize(int, ccd_heaplen);
416    CpvInitialize(int, ccd_heapmaxlen);
417    CpvInitialize(int, _ccd_numchecks);
418
419    CpvAccess(ccd_heaplen) = 0;
420    CpvAccess(ccd_heapmaxlen) = MAXTIMERHEAPENTRIES;
421    CpvAccess(ccd_heap) = 
422      (ccd_heap_elem*) malloc(sizeof(ccd_heap_elem)*2*(MAXTIMERHEAPENTRIES + 1));
423    _MEMCHECK(CpvAccess(ccd_heap));
424    for(i=0;i<MAXNUMCONDS;i++) {
425      init_cblist(&(CpvAccess(conds).condcb[i]), CBLIST_INIT_LEN);
426      init_cblist(&(CpvAccess(conds).condcb_keep[i]), CBLIST_INIT_LEN);
427    }
428    CpvAccess(_ccd_numchecks) = 1;
429    CpvAccess(pcb).nSkip = 1;
430    curTime=CmiWallTimer();
431    CpvAccess(pcb).lastCheck = curTime;
432    for (i=0;i<CCD_PERIODIC_MAX;i++)
433            CpvAccess(pcb).nextCall[i]=curTime+periodicCallInterval[i];
434    CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,CcdCallBacksReset,0);
435    CcdCallOnConditionKeep(CcdPROCESSOR_END_IDLE,CcdCallBacksReset,0);
436 }
437
438
439
440 /**
441  * Register a callback function that will be triggered when the specified
442  * condition is raised the next time
443  */
444 int CcdCallOnCondition(int condnum, CcdVoidFn fnp, void *arg)
445 {
446   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, CcdIGNOREPE);
447
448
449 /** 
450  * Register a callback function that will be triggered on the specified PE
451  * when the specified condition is raised the next time 
452  */
453 int CcdCallOnConditionOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
454 {
455   return append_elem(&(CpvAccess(conds).condcb[condnum]), fnp, arg, pe);
456
457
458 /**
459  * Register a callback function that will be triggered *whenever* the specified
460  * condition is raised
461  */
462 int CcdCallOnConditionKeep(int condnum, CcdVoidFn fnp, void *arg)
463 {
464   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, CcdIGNOREPE);
465
466
467 /**
468  * Register a callback function that will be triggered on the specified PE
469  * *whenever* the specified condition is raised
470  */
471 int CcdCallOnConditionKeepOnPE(int condnum, CcdVoidFn fnp, void *arg, int pe)
472 {
473   return append_elem(&(CpvAccess(conds).condcb_keep[condnum]), fnp, arg, pe);
474
475
476
477 /**
478  * Cancel a previously registered conditional callback
479  */
480 void CcdCancelCallOnCondition(int condnum, int idx)
481 {
482   remove_elem(&(CpvAccess(conds).condcb[condnum]), idx);
483 }
484
485
486 /**
487  * Cancel a previously registered conditional callback
488  */
489 void CcdCancelCallOnConditionKeep(int condnum, int idx)
490 {
491   remove_elem(&(CpvAccess(conds).condcb_keep[condnum]), idx);
492 }
493
494
495 /**
496  * Register a callback function that will be triggered on the specified PE
497  * after a minimum delay of deltaT
498  */
499 void CcdCallFnAfterOnPE(CcdVoidFn fnp, void *arg, double deltaT, int pe)
500 {
501     double ctime  = CmiWallTimer();
502     double tcall = ctime + deltaT/1000.0;
503     ccd_heap_insert(tcall, fnp, arg, pe);
504
505
506 /**
507  * Register a callback function that will be triggered after a minimum 
508  * delay of deltaT
509  */
510 void CcdCallFnAfter(CcdVoidFn fnp, void *arg, double deltaT)
511 {
512     CcdCallFnAfterOnPE(fnp, arg, deltaT, CcdIGNOREPE);
513
514
515
516 /**
517  * Raise a condition causing all registered callbacks corresponding to 
518  * that condition to be triggered
519  */
520 void CcdRaiseCondition(int condnum)
521 {
522   double curWallTime=CmiWallTimer();
523   call_cblist_remove(&(CpvAccess(conds).condcb[condnum]),curWallTime);
524   call_cblist_keep(&(CpvAccess(conds).condcb_keep[condnum]),curWallTime);
525 }
526
527
528 /* 
529  * Trigger callbacks periodically, and also the time-indexed
530  * functions if their time has arrived
531  */
532 void CcdCallBacks()
533 {
534   int i;
535   ccd_periodic_callbacks *o=&CpvAccess(pcb);
536   
537   /* Figure out how many times to skip Ccd processing */
538   double curWallTime = CmiWallTimer();
539
540   unsigned int nSkip=o->nSkip;
541 #if 1
542 /* Dynamically adjust the number of messages to skip */
543   double elapsed = curWallTime - o->lastCheck;
544 #define targetElapsed 5.0e-3
545   if (elapsed<targetElapsed) nSkip*=2; /* too short: process more */
546   else /* elapsed>targetElapsed */ nSkip/=2; /* too long: process fewer */
547   
548 /* Keep skipping within a sensible range */
549 #define minSkip 1u
550 #define maxSkip 20u
551   if (nSkip<minSkip) nSkip=minSkip;
552   else if (nSkip>maxSkip) nSkip=maxSkip;
553 #else
554 /* Always skip a fixed number of messages */
555   nSkip=1;
556 #endif
557
558   CpvAccess(_ccd_numchecks)=o->nSkip=nSkip;
559   o->lastCheck=curWallTime;
560   
561   ccd_heap_update(curWallTime);
562   
563   for (i=0;i<CCD_PERIODIC_MAX;i++) 
564     if (o->nextCall[i]<=curWallTime) {
565       CcdRaiseCondition(CcdPERIODIC+i);
566       o->nextCall[i]=curWallTime+periodicCallInterval[i];
567     }
568     else 
569       break; /*<- because intervals are multiples of one another*/
570
571
572
573
574 /**
575  * Called when something drastic changes-- restart ccd_num_checks
576  */
577 void CcdCallBacksReset(void *ignored,double curWallTime)
578 {
579   ccd_periodic_callbacks *o=&CpvAccess(pcb);
580   CpvAccess(_ccd_numchecks)=o->nSkip=1;
581   o->lastCheck=curWallTime;
582 }
583
584