Improving documentation for scheduling routines.
[charm.git] / src / conv-core / queueing.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <converse.h>
9 #include <string.h>
10 #include "queueing.h"
11
12 /** @defgroup CharmScheduler 
13     @brief The portion of Charm++ responsible for scheduling the execution 
14     of Charm++ entry methods
15
16     CqsEnqueueGeneral() is the main function that is responsible for enqueueing 
17     messages. It will store the messages in one of three queues based on the 
18     specified priorities or strategies. The Charm++ message queue is really three 
19     queues, one for positive priorities, one for zero priorities, and one for 
20     negative priorities. The positive and negative priorty queues are actually heaps.
21
22
23     The charm++ messages are only scheduled after the \ref ConverseScheduler "converse message queues"
24     have been emptied:
25
26     - CsdScheduleForever() is the commonly used Converse scheduler loop
27        - CsdNextMessage()
28           - First processes converse message from a  \ref ConverseScheduler "converse queue" if one exists using CdsFifo_Dequeue() 
29           - Then if no converse message was found, call CqsDequeue() which gets a charm++ message to execute if one exists
30
31
32     @file
33     Implementation of queuing data structure functions.
34     @ingroup CharmScheduler
35     
36     @addtogroup CharmScheduler
37     @{
38 */
39
40
41 /** A memory limit threshold for adaptively scheduling */
42 int schedAdaptMemThresholdMB;
43
44
45 /** Initialize a deq */
46 static void CqsDeqInit(d)
47 deq d;
48 {
49   d->bgn  = d->space;
50   d->end  = d->space+4;
51   d->head = d->space;
52   d->tail = d->space;
53 }
54
55 /** Double the size of a deq */
56 static void CqsDeqExpand(d)
57 deq d;
58 {
59   int rsize = (d->end - d->head);
60   int lsize = (d->head - d->bgn);
61   int oldsize = (d->end - d->bgn);
62   int newsize = (oldsize << 1);
63   void **ovec = d->bgn;
64   void **nvec = (void **)CmiAlloc(newsize * sizeof(void *));
65   memcpy(nvec, d->head, rsize * sizeof(void *));
66   memcpy(nvec+rsize, d->bgn, lsize * sizeof(void *));
67   d->bgn = nvec;
68   d->end = nvec + newsize;
69   d->head = nvec;
70   d->tail = nvec + oldsize;
71   if (ovec != d->space) CmiFree(ovec);
72 }
73
74 /** Insert a data pointer at the tail of a deq */
75 void CqsDeqEnqueueFifo(d, data)
76 deq d; void *data;
77 {
78   void **tail = d->tail;
79   *tail = data;
80   tail++;
81   if (tail == d->end) tail = d->bgn;
82   d->tail = tail;
83   if (tail == d->head) CqsDeqExpand(d);
84 }
85
86 /** Insert a data pointer at the head of a deq */
87 void CqsDeqEnqueueLifo(d, data)
88 deq d; void *data;
89 {
90   void **head = d->head;
91   if (head == d->bgn) head = d->end;
92   head--;
93   *head = data;
94   d->head = head;
95   if (head == d->tail) CqsDeqExpand(d);
96 }
97
98 /** Remove a data pointer from the head of a deq */
99 void *CqsDeqDequeue(d)
100 deq d;
101 {
102   void **head;
103   void **tail;
104   void *data;
105   head = d->head;
106   tail = d->tail;
107   if (head == tail) return 0;
108   data = *head;
109   head++;
110   if (head == d->end) head = d->bgn;
111   d->head = head;
112   return data;
113 }
114
115 /** Initialize a Priority Queue */
116 static void CqsPrioqInit(pq)
117 prioq pq;
118 {
119   int i;
120   pq->heapsize = 100;
121   pq->heapnext = 1;
122   pq->hash_key_size = PRIOQ_TABSIZE;
123   pq->hash_entry_size = 0;
124   pq->heap = (prioqelt *)CmiAlloc(100 * sizeof(prioqelt));
125   pq->hashtab = (prioqelt *)CmiAlloc(pq->hash_key_size * sizeof(prioqelt));
126   for (i=0; i<pq->hash_key_size; i++) pq->hashtab[i]=0;
127 }
128
129 #if CMK_C_INLINE
130 inline
131 #endif
132 /** Double the size of a Priority Queue's heap */
133 static void CqsPrioqExpand(prioq pq)
134 {
135   int oldsize = pq->heapsize;
136   int newsize = oldsize * 2;
137   prioqelt *oheap = pq->heap;
138   prioqelt *nheap = (prioqelt *)CmiAlloc(newsize*sizeof(prioqelt));
139   memcpy(nheap, oheap, oldsize * sizeof(prioqelt));
140   pq->heap = nheap;
141   pq->heapsize = newsize;
142   CmiFree(oheap);
143 }
144 #ifndef FASTQ
145 /** Double the size of a Priority Queue's hash table */
146 void CqsPrioqRehash(pq)
147      prioq pq;
148 {
149   int oldHsize = pq->hash_key_size;
150   int newHsize = oldHsize * 2;
151   unsigned int hashval;
152   prioqelt pe, pe1, pe2;
153   int i,j;
154
155   prioqelt *ohashtab = pq->hashtab;
156   prioqelt *nhashtab = (prioqelt *)CmiAlloc(newHsize*sizeof(prioqelt));
157
158   pq->hash_key_size = newHsize;
159
160   for(i=0; i<newHsize; i++)
161     nhashtab[i] = 0;
162
163   for(i=0; i<oldHsize; i++) {
164     for(pe=ohashtab[i]; pe; ) {
165       pe2 = pe->ht_next;
166       hashval = pe->pri.bits;
167       for (j=0; j<pe->pri.ints; j++) hashval ^= pe->pri.data[j];
168       hashval = (hashval&0x7FFFFFFF)%newHsize;
169
170       pe1=nhashtab[hashval];
171       pe->ht_next = pe1;
172       pe->ht_handle = (nhashtab+hashval);
173       if (pe1) pe1->ht_handle = &(pe->ht_next);
174       nhashtab[hashval]=pe;
175       pe = pe2;
176     }
177   }
178   pq->hashtab = nhashtab;
179   pq->hash_key_size = newHsize;
180   CmiFree(ohashtab);
181 }
182 #endif
183
184 /**
185  * Compare two priorities (treated as unsigned).
186  * @return 1 if prio1 > prio2
187  * @return ? if prio1 == prio2
188  * @return 0 if prio1 < prio2
189  */
190 int CqsPrioGT(prio1, prio2)
191 prio prio1;
192 prio prio2;
193 {
194 #ifndef FASTQ
195   unsigned int ints1 = prio1->ints;
196   unsigned int ints2 = prio2->ints;
197 #endif
198   unsigned int *data1 = prio1->data;
199   unsigned int *data2 = prio2->data;
200 #ifndef FASTQ
201   unsigned int val1;
202   unsigned int val2;
203 #endif
204   while (1) {
205 #ifndef FASTQ
206     if (ints1==0) return 0;
207     if (ints2==0) return 1;
208 #else
209     if (prio1->ints==0) return 0;
210     if (prio2->ints==0) return 1;
211 #endif
212 #ifndef FASTQ
213     val1 = *data1++;
214     val2 = *data2++;
215     if (val1 < val2) return 0;
216     if (val1 > val2) return 1;
217     ints1--;
218     ints2--;
219 #else
220     if(*data1++ < *data2++) return 0;
221     if(*data1++ > *data2++) return 1;
222     (prio1->ints)--;
223     (prio2->ints)--;
224 #endif
225   }
226 }
227
228 /** Find or create a bucket in the hash table for the specified priority. */
229 deq CqsPrioqGetDeq(pq, priobits, priodata)
230 prioq pq;
231 unsigned int priobits, *priodata;
232 {
233   unsigned int prioints = (priobits+CINTBITS-1)/CINTBITS;
234   unsigned int hashval, i;
235   int heappos; 
236   prioqelt *heap, pe, next, parent;
237   prio pri;
238   int mem_cmp_res;
239   unsigned int pri_bits_cmp;
240   static int cnt_nilesh=0;
241
242 #ifdef FASTQ
243   /*  printf("Hi I'm here %d\n",cnt_nilesh++); */
244 #endif
245   /* Scan for priority in hash-table, and return it if present */
246   hashval = priobits;
247   for (i=0; i<prioints; i++) hashval ^= priodata[i];
248   hashval = (hashval&0x7FFFFFFF)%PRIOQ_TABSIZE;
249 #ifndef FASTQ
250   for (pe=pq->hashtab[hashval]; pe; pe=pe->ht_next)
251     if (priobits == pe->pri.bits)
252       if (memcmp(priodata, pe->pri.data, sizeof(int)*prioints)==0)
253         return &(pe->data);
254 #else
255   parent=NULL;
256   for(pe=pq->hashtab[hashval]; pe; )
257   {
258     parent=pe;
259     pri_bits_cmp=pe->pri.bits;
260     mem_cmp_res=memcmp(priodata,pe->pri.data,sizeof(int)*prioints);
261     if(priobits == pri_bits_cmp && mem_cmp_res==0)
262       return &(pe->data);
263     else if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
264     {
265       pe=pe->ht_right;
266     }
267     else 
268     {
269       pe=pe->ht_left;
270     }
271   }
272 #endif
273   
274   /* If not present, allocate a bucket for specified priority */
275   pe = (prioqelt)CmiAlloc(sizeof(struct prioqelt_struct)+((prioints-1)*sizeof(int)));
276   pe->pri.bits = priobits;
277   pe->pri.ints = prioints;
278   memcpy(pe->pri.data, priodata, (prioints*sizeof(int)));
279   CqsDeqInit(&(pe->data));
280   pri=&(pe->pri);
281
282   /* Insert bucket into hash-table */
283   next = pq->hashtab[hashval];
284 #ifndef FASTQ
285   pe->ht_next = next;
286   pe->ht_handle = (pq->hashtab+hashval);
287   if (next) next->ht_handle = &(pe->ht_next);
288   pq->hashtab[hashval] = pe;
289 #else
290   pe->ht_parent = parent;
291   pe->ht_left = NULL;
292   pe->ht_right = NULL;
293   if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
294   {
295     if(parent) {
296       parent->ht_right = pe;
297       pe->ht_handle = &(parent->ht_right);
298     }
299     else {
300       pe->ht_handle = (pq->hashtab+hashval);
301       pq->hashtab[hashval] = pe;
302     }
303     /*    pe->ht_handle = &(pe); */
304   }
305   else
306   {
307     if(parent) {
308       parent->ht_left = pe;
309       pe->ht_handle = &(parent->ht_left);
310     }
311     else {
312       pe->ht_handle = (pq->hashtab+hashval);
313       pq->hashtab[hashval] = pe;
314     }
315     /*    pe->ht_handle = &(pe); */
316   }
317   if(!next)
318     pq->hashtab[hashval] = pe;
319 #endif
320   pq->hash_entry_size++;
321 #ifndef FASTQ
322   if(pq->hash_entry_size > 2*pq->hash_key_size)
323     CqsPrioqRehash(pq);
324 #endif  
325   /* Insert bucket into heap */
326   heappos = pq->heapnext++;
327   if (heappos == pq->heapsize) CqsPrioqExpand(pq);
328   heap = pq->heap;
329   while (heappos > 1) {
330     int parentpos = (heappos >> 1);
331     prioqelt parent = heap[parentpos];
332     if (CqsPrioGT(pri, &(parent->pri))) break;
333     heap[heappos] = parent; heappos=parentpos;
334   }
335   heap[heappos] = pe;
336
337 #ifdef FASTQ
338   /*  printf("Hi I'm here222\n"); */
339 #endif
340   
341   return &(pe->data);
342 }
343
344 /** Dequeue an entry */
345 void *CqsPrioqDequeue(pq)
346 prioq pq;
347 {
348   prio pri;
349   prioqelt pe, old; void *data;
350   int heappos, heapnext;
351   prioqelt *heap = pq->heap;
352   int left_child;
353   prioqelt temp1_ht_right, temp1_ht_left, temp1_ht_parent;
354   prioqelt *temp1_ht_handle;
355   static int cnt_nilesh1=0;
356
357 #ifdef FASTQ
358   /*  printf("Hi I'm here too!! %d\n",cnt_nilesh1++); */
359 #endif
360   if (pq->heapnext==1) return 0;
361   pe = heap[1];
362   data = CqsDeqDequeue(&(pe->data));
363   if (pe->data.head == pe->data.tail) {
364     /* Unlink prio-bucket from hash-table */
365 #ifndef FASTQ
366     prioqelt next = pe->ht_next;
367     prioqelt *handle = pe->ht_handle;
368     if (next) next->ht_handle = handle;
369     *handle = next;
370     old=pe;
371 #else
372     old=pe;
373     prioqelt *handle;
374     if(pe->ht_parent)
375     { 
376       if(pe->ht_parent->ht_left==pe) left_child=1;
377       else left_child=0;
378     }
379     else
380       {  /* it is the root in the hashtable entry, so its ht_handle should be used by whoever is the new root */
381       handle = pe->ht_handle;
382     }
383     
384     if(!pe->ht_left && !pe->ht_right)
385     {
386       if(pe->ht_parent) {
387         if(left_child) pe->ht_parent->ht_left=NULL;
388         else pe->ht_parent->ht_right=NULL;
389       }
390       else {
391         *handle = NULL;
392       }
393     }
394     else if(!pe->ht_right)
395     {
396       /*if the node does not have a right subtree, its left subtree root is the new child of its parent */
397       pe->ht_left->ht_parent=pe->ht_parent;
398       if(pe->ht_parent)
399       {
400         if(left_child) {
401           pe->ht_parent->ht_left = pe->ht_left;
402           pe->ht_left->ht_handle = &(pe->ht_parent->ht_left);
403         }
404         else {
405           pe->ht_parent->ht_right = pe->ht_left;
406           pe->ht_left->ht_handle = &(pe->ht_parent->ht_right);
407         }
408       }
409       else {
410         pe->ht_left->ht_handle = handle;
411         *handle = pe->ht_left;
412       }
413     }
414     else if(!pe->ht_left)
415     {
416       /*if the node does not have a left subtree, its right subtree root is the new child of its parent */
417       pe->ht_right->ht_parent=pe->ht_parent;
418       /*pe->ht_right->ht_left=pe->ht_left; */
419       if(pe->ht_parent)
420       {
421         if(left_child) {
422           pe->ht_parent->ht_left = pe->ht_right;
423           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
424         }
425         else {
426           pe->ht_parent->ht_right = pe->ht_right;
427           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
428         }
429       }
430       else {
431         pe->ht_right->ht_handle = handle;
432         *handle = pe->ht_right;
433       }
434     }
435     else if(!pe->ht_right->ht_left)
436     {
437       pe->ht_right->ht_parent=pe->ht_parent;
438       if(pe->ht_parent)
439       {
440         if(left_child) {
441           pe->ht_parent->ht_left = pe->ht_right;
442           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
443         }
444         else {
445           pe->ht_parent->ht_right = pe->ht_right;
446           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
447         }
448       }
449       else {
450         pe->ht_right->ht_handle = handle;
451         *handle = pe->ht_right;
452       }
453       if(pe->ht_left) {
454         pe->ht_right->ht_left = pe->ht_left;
455         pe->ht_left->ht_parent = pe->ht_right;
456         pe->ht_left->ht_handle = &(pe->ht_right->ht_left);
457       }
458     }
459     else
460     {
461       /*if it has both subtrees, swap it with its successor */
462       for(pe=pe->ht_right; pe; )
463       {
464         if(pe->ht_left) pe=pe->ht_left;
465         else  /*found the sucessor */
466           { /*take care of the connections */
467           if(old->ht_parent)
468           {
469             if(left_child) {
470               old->ht_parent->ht_left = pe;
471               pe->ht_handle = &(old->ht_parent->ht_left);
472             }
473             else {
474               old->ht_parent->ht_right = pe;
475               pe->ht_handle = &(old->ht_parent->ht_right);
476             }
477           }
478           else {
479             pe->ht_handle = handle;
480             *handle = pe;
481           }
482           temp1_ht_right = pe->ht_right;
483           temp1_ht_left = pe->ht_left;
484           temp1_ht_parent = pe->ht_parent;
485           temp1_ht_handle = pe->ht_handle;
486
487           pe->ht_parent = old->ht_parent;
488           pe->ht_left = old->ht_left;
489           pe->ht_right = old->ht_right;
490           if(pe->ht_left) {
491             pe->ht_left->ht_parent = pe;
492             pe->ht_right->ht_handle = &(pe->ht_right);
493           }
494           if(pe->ht_right) {
495             pe->ht_right->ht_parent = pe;
496             pe->ht_right->ht_handle = &(pe->ht_right);
497           }
498           temp1_ht_parent->ht_left = temp1_ht_right;
499           if(temp1_ht_right) {
500             temp1_ht_right->ht_handle = &(temp1_ht_parent->ht_left);
501             temp1_ht_right->ht_parent = temp1_ht_parent;
502           }
503           break;
504         }
505       }
506     }
507 #endif
508     pq->hash_entry_size--;
509     
510     /* Restore the heap */
511     heapnext = (--pq->heapnext);
512     pe = heap[heapnext];
513     pri = &(pe->pri);
514     heappos = 1;
515     while (1) {
516       int childpos1, childpos2, childpos;
517       prioqelt ch1, ch2, child;
518       childpos1 = heappos<<1;
519       if (childpos1>=heapnext) break;
520       childpos2 = childpos1+1;
521       if (childpos2>=heapnext)
522         { childpos=childpos1; child=heap[childpos1]; }
523       else {
524         ch1 = heap[childpos1];
525         ch2 = heap[childpos2];
526         if (CqsPrioGT(&(ch1->pri), &(ch2->pri)))
527              {childpos=childpos2; child=ch2;}
528         else {childpos=childpos1; child=ch1;}
529       }
530       if (CqsPrioGT(&(child->pri), pri)) break;
531       heap[heappos]=child; heappos=childpos;
532     }
533     heap[heappos]=pe;
534     
535     /* Free prio-bucket */
536     if (old->data.bgn != old->data.space) CmiFree(old->data.bgn);
537     CmiFree(old);
538   }
539   return data;
540 }
541
542 /** Initialize a Queue and its three internal queues (for positive, negative, and zero priorities) */
543 Queue CqsCreate(void)
544 {
545   Queue q = (Queue)CmiAlloc(sizeof(struct Queue_struct));
546   q->length = 0;
547   q->maxlen = 0;
548 #ifdef FASTQ
549   /*  printf("\nIN fastq"); */
550 #endif
551   CqsDeqInit(&(q->zeroprio));
552   CqsPrioqInit(&(q->negprioq));
553   CqsPrioqInit(&(q->posprioq));
554   return q;
555 }
556
557 /** Delete a Queue */
558 void CqsDelete(Queue q)
559 {
560   CmiFree(q->negprioq.heap);
561   CmiFree(q->posprioq.heap);
562   CmiFree(q);
563 }
564
565 unsigned int CqsLength(Queue q)
566 {
567   return q->length;
568 }
569
570 unsigned int CqsMaxLength(Queue q)
571 {
572   return q->maxlen;
573 }
574
575 int CqsEmpty(Queue q)
576 {
577   return (q->length == 0);
578 }
579
580
581 /** Enqueue something (usually an envelope*) into the queue in 
582     a manner consistent with the specified strategy and priority.
583 */
584 void CqsEnqueueGeneral(Queue q, void *data, int strategy, 
585            int priobits,unsigned int *prioptr)
586 {
587   deq d; int iprio;
588   CmiInt8 lprio0, lprio;
589   switch (strategy) {
590   case CQS_QUEUEING_FIFO: 
591     CqsDeqEnqueueFifo(&(q->zeroprio), data); 
592     break;
593   case CQS_QUEUEING_LIFO: 
594     CqsDeqEnqueueLifo(&(q->zeroprio), data); 
595     break;
596   case CQS_QUEUEING_IFIFO:
597     iprio=prioptr[0]+(1U<<(CINTBITS-1));
598     if ((int)iprio<0)
599       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
600     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
601     CqsDeqEnqueueFifo(d, data);
602     break;
603   case CQS_QUEUEING_ILIFO:
604     iprio=prioptr[0]+(1U<<(CINTBITS-1));
605     if ((int)iprio<0)
606       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
607     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
608     CqsDeqEnqueueLifo(d, data);
609     break;
610   case CQS_QUEUEING_BFIFO:
611     if (priobits&&(((int)(prioptr[0]))<0))
612        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
613     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
614     CqsDeqEnqueueFifo(d, data);
615     break;
616   case CQS_QUEUEING_BLIFO:
617     if (priobits&&(((int)(prioptr[0]))<0))
618        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
619     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
620     CqsDeqEnqueueLifo(d, data);
621     break;
622
623   case CQS_QUEUEING_LFIFO:     
624     CmiAssert(priobits == CLONGBITS);
625     lprio0 =((CmiUInt8 *)prioptr)[0];
626     lprio0 += (1ULL<<(CLONGBITS-1));
627     if (CmiEndianness() == 0) {           /* little-endian */
628       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
629     }
630     else {                /* little-endian */
631       lprio = lprio0;
632     }
633     if (lprio0<0)
634         d=CqsPrioqGetDeq(&(q->posprioq), priobits, &lprio);
635     else
636         d=CqsPrioqGetDeq(&(q->negprioq), priobits, &lprio);
637     CqsDeqEnqueueFifo(d, data);
638     break;
639   case CQS_QUEUEING_LLIFO:
640     lprio0 =((CmiUInt8 *)prioptr)[0];
641     lprio0 += (1ULL<<(CLONGBITS-1));
642     if (CmiEndianness() == 0) {           /* little-endian happen to compare least significant part first */
643       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
644     }
645     else {                /* little-endian */
646       lprio = lprio0;
647     }
648     if (lprio0<0)
649         d=CqsPrioqGetDeq(&(q->posprioq), priobits, &lprio);
650     else
651         d=CqsPrioqGetDeq(&(q->negprioq), priobits, &lprio);
652     CqsDeqEnqueueLifo(d, data);
653     break;
654   default:
655     CmiAbort("CqsEnqueueGeneral: invalid queueing strategy.\n");
656   }
657   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
658 }
659
660 void CqsEnqueueFifo(Queue q, void *data)
661 {
662   CqsDeqEnqueueFifo(&(q->zeroprio), data);
663   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
664 }
665
666 void CqsEnqueueLifo(Queue q, void *data)
667 {
668   CqsDeqEnqueueLifo(&(q->zeroprio), data);
669   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
670 }
671
672 void CqsEnqueue(Queue q, void *data)
673 {
674   CqsDeqEnqueueFifo(&(q->zeroprio), data);
675   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
676 }
677
678 /** Retrieve the highest priority message (one with most negative priority) */
679 void CqsDequeue(Queue q, void **resp)
680 {
681 #ifdef ADAPT_SCHED_MEM
682     /* Added by Isaac for testing purposes: */
683     if((q->length > 1) && (CmiMemoryUsage() > schedAdaptMemThresholdMB*1024*1024) ){
684         CqsIncreasePriorityForEntryMethod(q, 153);
685     }
686 #endif
687     
688   if (q->length==0) 
689     { *resp = 0; return; }
690   if (q->negprioq.heapnext>1)
691     { *resp = CqsPrioqDequeue(&(q->negprioq)); q->length--; return; }
692   if (q->zeroprio.head != q->zeroprio.tail)
693     { *resp = CqsDeqDequeue(&(q->zeroprio)); q->length--; return; }
694   if (q->posprioq.heapnext>1)
695     { *resp = CqsPrioqDequeue(&(q->posprioq)); q->length--; return; }
696   *resp = 0; return;
697 }
698
699 static struct prio_struct kprio_zero = { 0, 0, {0} };
700 static struct prio_struct kprio_max  = { 32, 1, {((unsigned int)(-1))} };
701 /** Get the priority of the highest priority message in q */
702 prio CqsGetPriority(q)
703 Queue q;
704 {
705   if (q->negprioq.heapnext>1) return &(q->negprioq.heap[1]->pri);
706   if (q->zeroprio.head != q->zeroprio.tail) { return &kprio_zero; }
707   if (q->posprioq.heapnext>1) return &(q->posprioq.heap[1]->pri);
708   return &kprio_max;
709 }
710
711
712 /* prio CqsGetSecondPriority(q) */
713 /* Queue q; */
714 /* { */
715 /*   return CqsGetPriority(q); */
716 /* } */
717
718
719 /** Produce an array containing all the entries in a deq
720     @return a newly allocated array filled with copies of the (void*) elements in the deq. 
721     @param [in] q a deq
722     @param [out] num the number of pointers in the returned array
723 */
724 void** CqsEnumerateDeq(deq q, int *num){
725   void **head, **tail;
726   void **result;
727   int count = 0;
728   int i;
729
730   head = q->head;
731   tail = q->tail;
732
733   while(head != tail){
734     count++;
735     head++;
736     if(head == q->end)
737       head = q->bgn;
738   }
739
740   result = (void **)CmiAlloc(count * sizeof(void *));
741   i = 0;
742   head = q->head;
743   tail = q->tail;
744   while(head != tail){
745     result[i] = *head;
746     i++;
747     head++;
748     if(head == q->end)
749       head = q->bgn;
750   }
751   *num = count;
752   return(result);
753 }
754
755 /** Produce an array containing all the entries in a prioq
756     @return a newly allocated array filled with copies of the (void*) elements in the prioq. 
757     @param [in] q a deq
758     @param [out] num the number of pointers in the returned array
759 */
760 void** CqsEnumeratePrioq(prioq q, int *num){
761   void **head, **tail;
762   void **result;
763   int i,j;
764   int count = 0;
765   prioqelt pe;
766
767   for(i = 1; i < q->heapnext; i++){
768     pe = (q->heap)[i];
769     head = pe->data.head;
770     tail = pe->data.tail;
771     while(head != tail){
772       count++;
773       head++;
774       if(head == (pe->data).end)
775         head = (pe->data).bgn;
776     }
777   }
778
779   result = (void **)CmiAlloc((count) * sizeof(void *));
780   *num = count;
781   
782   j = 0;
783   for(i = 1; i < q->heapnext; i++){
784     pe = (q->heap)[i];
785     head = pe->data.head;
786     tail = pe->data.tail;
787     while(head != tail){
788       result[j] = *head;
789       j++;
790       head++;
791       if(head ==(pe->data).end)
792         head = (pe->data).bgn; 
793     }
794   }
795
796   return result;
797 }
798
799 /** Produce an array containing all the entries in a Queue
800     @return a newly allocated array filled with copies of the (void*) elements in the Queue. 
801     @param [in] q a Queue
802     @param [out] resp an array of pointer entries found in the Queue
803 */
804 void CqsEnumerateQueue(Queue q, void ***resp){
805   void **result;
806   int num;
807   int i,j;
808
809   *resp = (void **)CmiAlloc(q->length * sizeof(void *));
810   j = 0;
811
812   result = CqsEnumeratePrioq(&(q->negprioq), &num);
813   for(i = 0; i < num; i++){
814     (*resp)[j] = result[i];
815     j++;
816   }
817   CmiFree(result);
818   
819   result = CqsEnumerateDeq(&(q->zeroprio), &num);
820   for(i = 0; i < num; i++){
821     (*resp)[j] = result[i];
822     j++;
823   }
824   CmiFree(result);
825
826   result = CqsEnumeratePrioq(&(q->posprioq), &num);
827   for(i = 0; i < num; i++){
828     (*resp)[j] = result[i];
829     j++;
830   }
831   CmiFree(result);
832 }
833
834
835
836
837 /** Remove first occurence of a specified entry from the deq  by setting the entry to NULL.
838     The size of the deq will not change, it will now just contain an entry for a NULL pointer.
839
840     @return number of entries that were replaced with NULL
841 */
842 int CqsRemoveSpecificDeq(deq q, const void *msgPtr){
843   void **head, **tail;
844
845   head = q->head;
846   tail = q->tail;
847
848   while(head != tail){
849     if(*head == msgPtr){
850       //    CmiPrintf("Replacing %p in deq with NULL\n", msgPtr);
851       //     *head = NULL;
852       return 1;
853     }
854     head++;
855     if(head == q->end)
856       head = q->bgn;
857   }
858   return 0;
859 }
860
861
862
863 /** Remove first occurence of a specified entry from the prioq by setting the entry to NULL.
864     The size of the prioq will not change, it will now just contain an entry for a NULL pointer.
865
866     @return number of entries that were replaced with NULL
867 */
868 int CqsRemoveSpecificPrioq(prioq q, const void *msgPtr){
869   void **head, **tail;
870   void **result;
871   int i;
872   prioqelt pe;
873
874   for(i = 1; i < q->heapnext; i++){
875     pe = (q->heap)[i];
876     head = pe->data.head;
877     tail = pe->data.tail;
878     while(head != tail){
879       if(*head == msgPtr){
880         //      CmiPrintf("Replacing %p in prioq with NULL\n", msgPtr);
881         *head = NULL;
882         return 1;
883       }     
884       head++;
885       if(head == (pe->data).end)
886         head = (pe->data).bgn;
887     }
888   } 
889   return 0;
890 }
891
892
893
894 /** Remove an occurence of a specified entry from the Queue by setting its entry to NULL. 
895     The size of the Queue will not change, it will now just contain an entry for a NULL pointer.
896 */
897 void CqsRemoveSpecific(Queue q, const void *msgPtr){
898   if( CqsRemoveSpecificPrioq(&(q->negprioq), msgPtr) == 0 )
899     if( CqsRemoveSpecificDeq(&(q->zeroprio), msgPtr) == 0 )  
900       if(CqsRemoveSpecificPrioq(&(q->posprioq), msgPtr) == 0){
901         CmiPrintf("Didn't remove the specified entry because it was not found\n");
902       }  
903 }
904
905
906
907
908
909
910
911
912
913
914 /** @} */