doc: Add serial to list of ci file reserved words
[charm.git] / src / conv-core / queueing.c
1 #include "queueing.h"
2 #include <converse.h>
3 #include <string.h>
4
5 /** @defgroup CharmScheduler 
6     @brief The portion of Charm++ responsible for scheduling the execution 
7     of Charm++ entry methods
8
9     CqsEnqueueGeneral() is the main function that is responsible for enqueueing 
10     messages. It will store the messages in one of three queues based on the 
11     specified priorities or strategies. The Charm++ message queue is really three 
12     queues, one for positive priorities, one for zero priorities, and one for 
13     negative priorities. The positive and negative priorty queues are actually heaps.
14
15
16     The charm++ messages are only scheduled after the \ref
17     ConverseScheduler "converse message queues" have been emptied:
18
19     - CsdScheduleForever() is the commonly used Converse scheduler loop
20        - CsdNextMessage()
21           - First processes converse message from a  \ref ConverseScheduler "converse queue" if one exists using CdsFifo_Dequeue() 
22           - Then if no converse message was found, call CqsDequeue() which gets a charm++ message to execute if one exists
23
24
25     @file
26     Implementation of queuing data structure functions.
27     @ingroup CharmScheduler
28     
29     @addtogroup CharmScheduler
30     @{
31 */
32
33 #if 1
34 /* I don't see why we need CmiAlloc here, allocated from pinned memory
35  * can be expensive */
36 #define CmiAlloc   malloc
37 #define CmiFree    free
38 #endif
39
40 /** A memory limit threshold for adaptively scheduling */
41 int schedAdaptMemThresholdMB;
42
43
44 /** Initialize a deq */
45 static void CqsDeqInit(_deq d)
46 {
47   d->bgn  = d->space;
48   d->end  = d->space+4;
49   d->head = d->space;
50   d->tail = d->space;
51 }
52
53 /** Double the size of a deq */
54 static void CqsDeqExpand(_deq d)
55 {
56   int rsize = (d->end - d->head);
57   int lsize = (d->head - d->bgn);
58   int oldsize = (d->end - d->bgn);
59   int newsize = (oldsize << 1);
60   void **ovec = d->bgn;
61   void **nvec = (void **)CmiAlloc(newsize * sizeof(void *));
62   memcpy(nvec, d->head, rsize * sizeof(void *));
63   memcpy(nvec+rsize, d->bgn, lsize * sizeof(void *));
64   d->bgn = nvec;
65   d->end = nvec + newsize;
66   d->head = nvec;
67   d->tail = nvec + oldsize;
68   if (ovec != d->space) CmiFree(ovec);
69 }
70
71 /** Insert a data pointer at the tail of a deq */
72 void CqsDeqEnqueueFifo(_deq d, void *data)
73 {
74   void **tail = d->tail;
75   *tail = data;
76   tail++;
77   if (tail == d->end) tail = d->bgn;
78   d->tail = tail;
79   if (tail == d->head) CqsDeqExpand(d);
80 }
81
82 /** Insert a data pointer at the head of a deq */
83 void CqsDeqEnqueueLifo(_deq d, void *data)
84 {
85   void **head = d->head;
86   if (head == d->bgn) head = d->end;
87   head--;
88   *head = data;
89   d->head = head;
90   if (head == d->tail) CqsDeqExpand(d);
91 }
92
93 /** Remove a data pointer from the head of a deq */
94 void *CqsDeqDequeue(_deq d)
95 {
96   void **head;
97   void **tail;
98   void *data;
99   head = d->head;
100   tail = d->tail;
101   if (head == tail) return 0;
102   data = *head;
103   head++;
104   if (head == d->end) head = d->bgn;
105   d->head = head;
106   return data;
107 }
108
109 /** Initialize a Priority Queue */
110 static void CqsPrioqInit(_prioq pq)
111 {
112   int i;
113   pq->heapsize = 100;
114   pq->heapnext = 1;
115   pq->hash_key_size = PRIOQ_TABSIZE;
116   pq->hash_entry_size = 0;
117   pq->heap = (_prioqelt *)CmiAlloc(100 * sizeof(_prioqelt));
118   pq->hashtab = (_prioqelt *)CmiAlloc(pq->hash_key_size * sizeof(_prioqelt));
119   for (i=0; i<pq->hash_key_size; i++) pq->hashtab[i]=0;
120 }
121
122 #if CMK_C_INLINE
123 inline
124 #endif
125 /** Double the size of a Priority Queue's heap */
126 static void CqsPrioqExpand(_prioq pq)
127 {
128   int oldsize = pq->heapsize;
129   int newsize = oldsize * 2;
130   _prioqelt *oheap = pq->heap;
131   _prioqelt *nheap = (_prioqelt *)CmiAlloc(newsize*sizeof(_prioqelt));
132   memcpy(nheap, oheap, oldsize * sizeof(_prioqelt));
133   pq->heap = nheap;
134   pq->heapsize = newsize;
135   CmiFree(oheap);
136 }
137 #ifndef FASTQ
138 /** Double the size of a Priority Queue's hash table */
139 void CqsPrioqRehash(_prioq pq)
140 {
141   int oldHsize = pq->hash_key_size;
142   int newHsize = oldHsize * 2;
143   unsigned int hashval;
144   _prioqelt pe, pe1, pe2;
145   int i,j;
146
147   _prioqelt *ohashtab = pq->hashtab;
148   _prioqelt *nhashtab = (_prioqelt *)CmiAlloc(newHsize*sizeof(_prioqelt));
149
150   pq->hash_key_size = newHsize;
151
152   for(i=0; i<newHsize; i++)
153     nhashtab[i] = 0;
154
155   for(i=0; i<oldHsize; i++) {
156     for(pe=ohashtab[i]; pe; ) {
157       pe2 = pe->ht_next;
158       hashval = pe->pri.bits;
159       for (j=0; j<pe->pri.ints; j++) hashval ^= pe->pri.data[j];
160       hashval = (hashval&0x7FFFFFFF)%newHsize;
161
162       pe1=nhashtab[hashval];
163       pe->ht_next = pe1;
164       pe->ht_handle = (nhashtab+hashval);
165       if (pe1) pe1->ht_handle = &(pe->ht_next);
166       nhashtab[hashval]=pe;
167       pe = pe2;
168     }
169   }
170   pq->hashtab = nhashtab;
171   pq->hash_key_size = newHsize;
172   CmiFree(ohashtab);
173 }
174 #endif
175
176 int CqsPrioGT_(unsigned int ints1, unsigned int *data1, unsigned int ints2, unsigned int *data2)
177 {
178   unsigned int val1;
179   unsigned int val2;
180   while (1) {
181     if (ints1==0) return 0;
182     if (ints2==0) return 1;
183     val1 = *data1++;
184     val2 = *data2++;
185     if (val1 < val2) return 0;
186     if (val1 > val2) return 1;
187     ints1--;
188     ints2--;
189   }
190 }
191
192 /**
193  * Compare two priorities (treated as unsigned).
194  * @return 1 if prio1 > prio2
195  * @return ? if prio1 == prio2
196  * @return 0 if prio1 < prio2
197  */
198 int CqsPrioGT(_prio prio1, _prio prio2)
199 {
200 #ifndef FASTQ
201   unsigned int ints1 = prio1->ints;
202   unsigned int ints2 = prio2->ints;
203 #endif
204   unsigned int *data1 = prio1->data;
205   unsigned int *data2 = prio2->data;
206 #ifndef FASTQ
207   unsigned int val1;
208   unsigned int val2;
209 #endif
210   while (1) {
211 #ifndef FASTQ
212     if (ints1==0) return 0;
213     if (ints2==0) return 1;
214 #else
215     if (prio1->ints==0) return 0;
216     if (prio2->ints==0) return 1;
217 #endif
218 #ifndef FASTQ
219     val1 = *data1++;
220     val2 = *data2++;
221     if (val1 < val2) return 0;
222     if (val1 > val2) return 1;
223     ints1--;
224     ints2--;
225 #else
226     if(*data1++ < *data2++) return 0;
227     if(*data1++ > *data2++) return 1;
228     (prio1->ints)--;
229     (prio2->ints)--;
230 #endif
231   }
232 }
233
234 /** Find or create a bucket in the hash table for the specified priority. */
235 _deq CqsPrioqGetDeq(_prioq pq, unsigned int priobits, unsigned int *priodata)
236 {
237   unsigned int prioints = (priobits+CINTBITS-1)/CINTBITS;
238   unsigned int hashval, i;
239   int heappos; 
240   _prioqelt *heap, pe, next, parent;
241   _prio pri;
242   int mem_cmp_res;
243   unsigned int pri_bits_cmp;
244   static int cnt_nilesh=0;
245
246 #ifdef FASTQ
247   /*  printf("Hi I'm here %d\n",cnt_nilesh++); */
248 #endif
249   /* Scan for priority in hash-table, and return it if present */
250   hashval = priobits;
251   for (i=0; i<prioints; i++) hashval ^= priodata[i];
252   hashval = (hashval&0x7FFFFFFF)%PRIOQ_TABSIZE;
253 #ifndef FASTQ
254   for (pe=pq->hashtab[hashval]; pe; pe=pe->ht_next)
255     if (priobits == pe->pri.bits)
256       if (memcmp(priodata, pe->pri.data, sizeof(int)*prioints)==0)
257         return &(pe->data);
258 #else
259   parent=NULL;
260   for(pe=pq->hashtab[hashval]; pe; )
261   {
262     parent=pe;
263     pri_bits_cmp=pe->pri.bits;
264     mem_cmp_res=memcmp(priodata,pe->pri.data,sizeof(int)*prioints);
265     if(priobits == pri_bits_cmp && mem_cmp_res==0)
266       return &(pe->data);
267     else if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
268     {
269       pe=pe->ht_right;
270     }
271     else 
272     {
273       pe=pe->ht_left;
274     }
275   }
276 #endif
277   
278   /* If not present, allocate a bucket for specified priority */
279   pe = (_prioqelt)CmiAlloc(sizeof(struct prioqelt_struct)+((prioints-1)*sizeof(int)));
280   pe->pri.bits = priobits;
281   pe->pri.ints = prioints;
282   memcpy(pe->pri.data, priodata, (prioints*sizeof(int)));
283   CqsDeqInit(&(pe->data));
284   pri=&(pe->pri);
285
286   /* Insert bucket into hash-table */
287   next = pq->hashtab[hashval];
288 #ifndef FASTQ
289   pe->ht_next = next;
290   pe->ht_handle = (pq->hashtab+hashval);
291   if (next) next->ht_handle = &(pe->ht_next);
292   pq->hashtab[hashval] = pe;
293 #else
294   pe->ht_parent = parent;
295   pe->ht_left = NULL;
296   pe->ht_right = NULL;
297   if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
298   {
299     if(parent) {
300       parent->ht_right = pe;
301       pe->ht_handle = &(parent->ht_right);
302     }
303     else {
304       pe->ht_handle = (pq->hashtab+hashval);
305       pq->hashtab[hashval] = pe;
306     }
307     /*    pe->ht_handle = &(pe); */
308   }
309   else
310   {
311     if(parent) {
312       parent->ht_left = pe;
313       pe->ht_handle = &(parent->ht_left);
314     }
315     else {
316       pe->ht_handle = (pq->hashtab+hashval);
317       pq->hashtab[hashval] = pe;
318     }
319     /*    pe->ht_handle = &(pe); */
320   }
321   if(!next)
322     pq->hashtab[hashval] = pe;
323 #endif
324   pq->hash_entry_size++;
325 #ifndef FASTQ
326   if(pq->hash_entry_size > 2*pq->hash_key_size)
327     CqsPrioqRehash(pq);
328 #endif  
329   /* Insert bucket into heap */
330   heappos = pq->heapnext++;
331   if (heappos == pq->heapsize) CqsPrioqExpand(pq);
332   heap = pq->heap;
333   while (heappos > 1) {
334     int parentpos = (heappos >> 1);
335     _prioqelt parent = heap[parentpos];
336     if (CqsPrioGT(pri, &(parent->pri))) break;
337     heap[heappos] = parent; heappos=parentpos;
338   }
339   heap[heappos] = pe;
340
341 #ifdef FASTQ
342   /*  printf("Hi I'm here222\n"); */
343 #endif
344   
345   return &(pe->data);
346 }
347
348 /** Dequeue an entry */
349 void *CqsPrioqDequeue(_prioq pq)
350 {
351   _prio pri;
352   _prioqelt pe, old; void *data;
353   int heappos, heapnext;
354   _prioqelt *heap = pq->heap;
355   int left_child;
356   _prioqelt temp1_ht_right, temp1_ht_left, temp1_ht_parent;
357   _prioqelt *temp1_ht_handle;
358   static int cnt_nilesh1=0;
359
360 #ifdef FASTQ
361   /*  printf("Hi I'm here too!! %d\n",cnt_nilesh1++); */
362 #endif
363   if (pq->heapnext==1) return 0;
364   pe = heap[1];
365   data = CqsDeqDequeue(&(pe->data));
366   if (pe->data.head == pe->data.tail) {
367     /* Unlink prio-bucket from hash-table */
368 #ifndef FASTQ
369     _prioqelt next = pe->ht_next;
370     _prioqelt *handle = pe->ht_handle;
371     if (next) next->ht_handle = handle;
372     *handle = next;
373     old=pe;
374 #else
375     old=pe;
376     prioqelt *handle;
377     if(pe->ht_parent)
378     { 
379       if(pe->ht_parent->ht_left==pe) left_child=1;
380       else left_child=0;
381     }
382     else
383       {  /* it is the root in the hashtable entry, so its ht_handle should be used by whoever is the new root */
384       handle = pe->ht_handle;
385     }
386     
387     if(!pe->ht_left && !pe->ht_right)
388     {
389       if(pe->ht_parent) {
390         if(left_child) pe->ht_parent->ht_left=NULL;
391         else pe->ht_parent->ht_right=NULL;
392       }
393       else {
394         *handle = NULL;
395       }
396     }
397     else if(!pe->ht_right)
398     {
399       /*if the node does not have a right subtree, its left subtree root is the new child of its parent */
400       pe->ht_left->ht_parent=pe->ht_parent;
401       if(pe->ht_parent)
402       {
403         if(left_child) {
404           pe->ht_parent->ht_left = pe->ht_left;
405           pe->ht_left->ht_handle = &(pe->ht_parent->ht_left);
406         }
407         else {
408           pe->ht_parent->ht_right = pe->ht_left;
409           pe->ht_left->ht_handle = &(pe->ht_parent->ht_right);
410         }
411       }
412       else {
413         pe->ht_left->ht_handle = handle;
414         *handle = pe->ht_left;
415       }
416     }
417     else if(!pe->ht_left)
418     {
419       /*if the node does not have a left subtree, its right subtree root is the new child of its parent */
420       pe->ht_right->ht_parent=pe->ht_parent;
421       /*pe->ht_right->ht_left=pe->ht_left; */
422       if(pe->ht_parent)
423       {
424         if(left_child) {
425           pe->ht_parent->ht_left = pe->ht_right;
426           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
427         }
428         else {
429           pe->ht_parent->ht_right = pe->ht_right;
430           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
431         }
432       }
433       else {
434         pe->ht_right->ht_handle = handle;
435         *handle = pe->ht_right;
436       }
437     }
438     else if(!pe->ht_right->ht_left)
439     {
440       pe->ht_right->ht_parent=pe->ht_parent;
441       if(pe->ht_parent)
442       {
443         if(left_child) {
444           pe->ht_parent->ht_left = pe->ht_right;
445           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
446         }
447         else {
448           pe->ht_parent->ht_right = pe->ht_right;
449           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
450         }
451       }
452       else {
453         pe->ht_right->ht_handle = handle;
454         *handle = pe->ht_right;
455       }
456       if(pe->ht_left) {
457         pe->ht_right->ht_left = pe->ht_left;
458         pe->ht_left->ht_parent = pe->ht_right;
459         pe->ht_left->ht_handle = &(pe->ht_right->ht_left);
460       }
461     }
462     else
463     {
464       /*if it has both subtrees, swap it with its successor */
465       for(pe=pe->ht_right; pe; )
466       {
467         if(pe->ht_left) pe=pe->ht_left;
468         else  /*found the sucessor */
469           { /*take care of the connections */
470           if(old->ht_parent)
471           {
472             if(left_child) {
473               old->ht_parent->ht_left = pe;
474               pe->ht_handle = &(old->ht_parent->ht_left);
475             }
476             else {
477               old->ht_parent->ht_right = pe;
478               pe->ht_handle = &(old->ht_parent->ht_right);
479             }
480           }
481           else {
482             pe->ht_handle = handle;
483             *handle = pe;
484           }
485           temp1_ht_right = pe->ht_right;
486           temp1_ht_left = pe->ht_left;
487           temp1_ht_parent = pe->ht_parent;
488           temp1_ht_handle = pe->ht_handle;
489
490           pe->ht_parent = old->ht_parent;
491           pe->ht_left = old->ht_left;
492           pe->ht_right = old->ht_right;
493           if(pe->ht_left) {
494             pe->ht_left->ht_parent = pe;
495             pe->ht_right->ht_handle = &(pe->ht_right);
496           }
497           if(pe->ht_right) {
498             pe->ht_right->ht_parent = pe;
499             pe->ht_right->ht_handle = &(pe->ht_right);
500           }
501           temp1_ht_parent->ht_left = temp1_ht_right;
502           if(temp1_ht_right) {
503             temp1_ht_right->ht_handle = &(temp1_ht_parent->ht_left);
504             temp1_ht_right->ht_parent = temp1_ht_parent;
505           }
506           break;
507         }
508       }
509     }
510 #endif
511     pq->hash_entry_size--;
512     
513     /* Restore the heap */
514     heapnext = (--pq->heapnext);
515     pe = heap[heapnext];
516     pri = &(pe->pri);
517     heappos = 1;
518     while (1) {
519       int childpos1, childpos2, childpos;
520       _prioqelt ch1, ch2, child;
521       childpos1 = heappos<<1;
522       if (childpos1>=heapnext) break;
523       childpos2 = childpos1+1;
524       if (childpos2>=heapnext)
525         { childpos=childpos1; child=heap[childpos1]; }
526       else {
527         ch1 = heap[childpos1];
528         ch2 = heap[childpos2];
529         if (CqsPrioGT(&(ch1->pri), &(ch2->pri)))
530              {childpos=childpos2; child=ch2;}
531         else {childpos=childpos1; child=ch1;}
532       }
533       if (CqsPrioGT(&(child->pri), pri)) break;
534       heap[heappos]=child; heappos=childpos;
535     }
536     heap[heappos]=pe;
537     
538     /* Free prio-bucket */
539     if (old->data.bgn != old->data.space) CmiFree(old->data.bgn);
540     CmiFree(old);
541   }
542   return data;
543 }
544
545 Queue CqsCreate(void)
546 {
547   Queue q = (Queue)CmiAlloc(sizeof(struct Queue_struct));
548   q->length = 0;
549   q->maxlen = 0;
550 #ifdef FASTQ
551   /*  printf("\nIN fastq"); */
552 #endif
553   CqsDeqInit(&(q->zeroprio));
554   CqsPrioqInit(&(q->negprioq));
555   CqsPrioqInit(&(q->posprioq));
556   return q;
557 }
558
559 void CqsDelete(Queue q)
560 {
561   CmiFree(q->negprioq.heap);
562   CmiFree(q->posprioq.heap);
563   CmiFree(q);
564 }
565
566 unsigned int CqsLength(Queue q)
567 {
568   return q->length;
569 }
570
571 unsigned int CqsMaxLength(Queue q)
572 {
573   return q->maxlen;
574 }
575
576 int CqsEmpty(Queue q)
577 {
578   return (q->length == 0);
579 }
580
581 void CqsEnqueueGeneral(Queue q, void *data, int strategy, 
582            int priobits,unsigned int *prioptr)
583 {
584   _deq d; int iprio;
585   CmiInt8 lprio0, lprio;
586   switch (strategy) {
587   case CQS_QUEUEING_FIFO: 
588     CqsDeqEnqueueFifo(&(q->zeroprio), data); 
589     break;
590   case CQS_QUEUEING_LIFO: 
591     CqsDeqEnqueueLifo(&(q->zeroprio), data); 
592     break;
593   case CQS_QUEUEING_IFIFO:
594     iprio=prioptr[0]+(1U<<(CINTBITS-1));
595     if ((int)iprio<0)
596       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
597     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
598     CqsDeqEnqueueFifo(d, data);
599     break;
600   case CQS_QUEUEING_ILIFO:
601     iprio=prioptr[0]+(1U<<(CINTBITS-1));
602     if ((int)iprio<0)
603       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
604     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
605     CqsDeqEnqueueLifo(d, data);
606     break;
607   case CQS_QUEUEING_BFIFO:
608     if (priobits&&(((int)(prioptr[0]))<0))
609        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
610     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
611     CqsDeqEnqueueFifo(d, data);
612     break;
613   case CQS_QUEUEING_BLIFO:
614     if (priobits&&(((int)(prioptr[0]))<0))
615        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
616     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
617     CqsDeqEnqueueLifo(d, data);
618     break;
619
620     /* The following two cases have a 64bit integer as priority value. Therefore,
621      * we can cast the address of the CmiInt8 lprio to an "unsigned int" pointer
622      * when passing it to CqsPrioqGetDeq. The endianness is taken care explicitly.
623      */
624   case CQS_QUEUEING_LFIFO:     
625     CmiAssert(priobits == CLONGBITS);
626     lprio0 =((CmiInt8 *)prioptr)[0];
627     lprio0 += (1ULL<<(CLONGBITS-1));
628     if (CmiEndianness() == 0) {           /* little-endian */
629       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
630     }
631     else {                /* little-endian */
632       lprio = lprio0;
633     }
634     if (lprio0<0)
635         d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
636     else
637         d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
638     CqsDeqEnqueueFifo(d, data);
639     break;
640   case CQS_QUEUEING_LLIFO:
641     lprio0 =((CmiInt8 *)prioptr)[0];
642     lprio0 += (1ULL<<(CLONGBITS-1));
643     if (CmiEndianness() == 0) {           /* little-endian happen to compare least significant part first */
644       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
645     }
646     else {                /* little-endian */
647       lprio = lprio0;
648     }
649     if (lprio0<0)
650         d=CqsPrioqGetDeq(&(q->posprioq), priobits, (unsigned int *)&lprio);
651     else
652         d=CqsPrioqGetDeq(&(q->negprioq), priobits, (unsigned int *)&lprio);
653     CqsDeqEnqueueLifo(d, data);
654     break;
655   default:
656     CmiAbort("CqsEnqueueGeneral: invalid queueing strategy.\n");
657   }
658   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
659 }
660
661 void CqsEnqueueFifo(Queue q, void *data)
662 {
663   CqsDeqEnqueueFifo(&(q->zeroprio), data);
664   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
665 }
666
667 void CqsEnqueueLifo(Queue q, void *data)
668 {
669   CqsDeqEnqueueLifo(&(q->zeroprio), data);
670   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
671 }
672
673 void CqsEnqueue(Queue q, void *data)
674 {
675   CqsDeqEnqueueFifo(&(q->zeroprio), data);
676   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
677 }
678
679 void CqsDequeue(Queue q, void **resp)
680 {
681 #ifdef ADAPT_SCHED_MEM
682     /* Added by Isaac for testing purposes: */
683     if((q->length > 1) && (CmiMemoryUsage() > schedAdaptMemThresholdMB*1024*1024) ){
684         /* CqsIncreasePriorityForEntryMethod(q, 153); */
685         CqsIncreasePriorityForMemCriticalEntries(q); 
686     }
687 #endif
688     
689   if (q->length==0) 
690     { *resp = 0; return; }
691   if (q->negprioq.heapnext>1)
692     { *resp = CqsPrioqDequeue(&(q->negprioq)); q->length--; return; }
693   if (q->zeroprio.head != q->zeroprio.tail)
694     { *resp = CqsDeqDequeue(&(q->zeroprio)); q->length--; return; }
695   if (q->posprioq.heapnext>1)
696     { *resp = CqsPrioqDequeue(&(q->posprioq)); q->length--; return; }
697   *resp = 0; return;
698 }
699
700 static struct prio_struct kprio_zero = { 0, 0, {0} };
701 static struct prio_struct kprio_max  = { 32, 1, {((unsigned int)(-1))} };
702
703 _prio CqsGetPriority(Queue q)
704 {
705   if (q->negprioq.heapnext>1) return &(q->negprioq.heap[1]->pri);
706   if (q->zeroprio.head != q->zeroprio.tail) { return &kprio_zero; }
707   if (q->posprioq.heapnext>1) return &(q->posprioq.heap[1]->pri);
708   return &kprio_max;
709 }
710
711
712 /* prio CqsGetSecondPriority(q) */
713 /* Queue q; */
714 /* { */
715 /*   return CqsGetPriority(q); */
716 /* } */
717
718
719 /** Produce an array containing all the entries in a deq
720     @return a newly allocated array filled with copies of the (void*) elements in the deq. 
721     @param [in] q a deq
722     @param [out] num the number of pointers in the returned array
723 */
724 void** CqsEnumerateDeq(_deq q, int *num){
725   void **head, **tail;
726   void **result;
727   int count = 0;
728   int i;
729
730   head = q->head;
731   tail = q->tail;
732
733   while(head != tail){
734     count++;
735     head++;
736     if(head == q->end)
737       head = q->bgn;
738   }
739
740   result = (void **)CmiAlloc(count * sizeof(void *));
741   i = 0;
742   head = q->head;
743   tail = q->tail;
744   while(head != tail){
745     result[i] = *head;
746     i++;
747     head++;
748     if(head == q->end)
749       head = q->bgn;
750   }
751   *num = count;
752   return(result);
753 }
754
755 /** Produce an array containing all the entries in a prioq
756     @return a newly allocated array filled with copies of the (void*) elements in the prioq. 
757     @param [in] q a deq
758     @param [out] num the number of pointers in the returned array
759 */
760 void** CqsEnumeratePrioq(_prioq q, int *num){
761   void **head, **tail;
762   void **result;
763   int i,j;
764   int count = 0;
765   _prioqelt pe;
766
767   for(i = 1; i < q->heapnext; i++){
768     pe = (q->heap)[i];
769     head = pe->data.head;
770     tail = pe->data.tail;
771     while(head != tail){
772       count++;
773       head++;
774       if(head == (pe->data).end)
775         head = (pe->data).bgn;
776     }
777   }
778
779   result = (void **)CmiAlloc((count) * sizeof(void *));
780   *num = count;
781   
782   j = 0;
783   for(i = 1; i < q->heapnext; i++){
784     pe = (q->heap)[i];
785     head = pe->data.head;
786     tail = pe->data.tail;
787     while(head != tail){
788       result[j] = *head;
789       j++;
790       head++;
791       if(head ==(pe->data).end)
792         head = (pe->data).bgn; 
793     }
794   }
795
796   return result;
797 }
798
799 void CqsEnumerateQueue(Queue q, void ***resp){
800   void **result;
801   int num;
802   int i,j;
803
804   *resp = (void **)CmiAlloc(q->length * sizeof(void *));
805   j = 0;
806
807   result = CqsEnumeratePrioq(&(q->negprioq), &num);
808   for(i = 0; i < num; i++){
809     (*resp)[j] = result[i];
810     j++;
811   }
812   CmiFree(result);
813   
814   result = CqsEnumerateDeq(&(q->zeroprio), &num);
815   for(i = 0; i < num; i++){
816     (*resp)[j] = result[i];
817     j++;
818   }
819   CmiFree(result);
820
821   result = CqsEnumeratePrioq(&(q->posprioq), &num);
822   for(i = 0; i < num; i++){
823     (*resp)[j] = result[i];
824     j++;
825   }
826   CmiFree(result);
827 }
828
829 /**
830    Remove first occurence of a specified entry from the deq  by
831    setting the entry to NULL.
832
833    The size of the deq will not change, it will now just contain an
834    entry for a NULL pointer.
835
836    @return number of entries that were replaced with NULL
837 */
838 int CqsRemoveSpecificDeq(_deq q, const void *msgPtr){
839   void **head, **tail;
840
841   head = q->head;
842   tail = q->tail;
843
844   while(head != tail){
845     if(*head == msgPtr){
846       /*    CmiPrintf("Replacing %p in deq with NULL\n", msgPtr); */
847       /*     *head = NULL;  */
848       return 1;
849     }
850     head++;
851     if(head == q->end)
852       head = q->bgn;
853   }
854   return 0;
855 }
856
857 /**
858    Remove first occurence of a specified entry from the prioq by
859    setting the entry to NULL.
860
861    The size of the prioq will not change, it will now just contain an
862    entry for a NULL pointer.
863
864    @return number of entries that were replaced with NULL
865 */
866 int CqsRemoveSpecificPrioq(_prioq q, const void *msgPtr){
867   void **head, **tail;
868   void **result;
869   int i;
870   _prioqelt pe;
871
872   for(i = 1; i < q->heapnext; i++){
873     pe = (q->heap)[i];
874     head = pe->data.head;
875     tail = pe->data.tail;
876     while(head != tail){
877       if(*head == msgPtr){
878         /*      CmiPrintf("Replacing %p in prioq with NULL\n", msgPtr); */
879         *head = NULL;
880         return 1;
881       }     
882       head++;
883       if(head == (pe->data).end)
884         head = (pe->data).bgn;
885     }
886   } 
887   return 0;
888 }
889
890 void CqsRemoveSpecific(Queue q, const void *msgPtr){
891   if( CqsRemoveSpecificPrioq(&(q->negprioq), msgPtr) == 0 )
892     if( CqsRemoveSpecificDeq(&(q->zeroprio), msgPtr) == 0 )  
893       if(CqsRemoveSpecificPrioq(&(q->posprioq), msgPtr) == 0){
894         CmiPrintf("Didn't remove the specified entry because it was not found\n");
895       }  
896 }
897
898 #ifdef ADAPT_SCHED_MEM
899 int numMemCriticalEntries=0;
900 int *memCriticalEntries=NULL;
901 #endif
902
903 /** @} */