a bug fix for LFIFO long priority queue. For little endian machines such as intel...
[charm.git] / src / conv-core / queueing.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <converse.h>
9 #include <string.h>
10 #include "queueing.h"
11
12 static void CqsDeqInit(d)
13 deq d;
14 {
15   d->bgn  = d->space;
16   d->end  = d->space+4;
17   d->head = d->space;
18   d->tail = d->space;
19 }
20
21 static void CqsDeqExpand(d)
22 deq d;
23 {
24   int rsize = (d->end - d->head);
25   int lsize = (d->head - d->bgn);
26   int oldsize = (d->end - d->bgn);
27   int newsize = (oldsize << 1);
28   void **ovec = d->bgn;
29   void **nvec = (void **)CmiAlloc(newsize * sizeof(void *));
30   memcpy(nvec, d->head, rsize * sizeof(void *));
31   memcpy(nvec+rsize, d->bgn, lsize * sizeof(void *));
32   d->bgn = nvec;
33   d->end = nvec + newsize;
34   d->head = nvec;
35   d->tail = nvec + oldsize;
36   if (ovec != d->space) CmiFree(ovec);
37 }
38
39 void CqsDeqEnqueueFifo(d, data)
40 deq d; void *data;
41 {
42   void **tail = d->tail;
43   *tail = data;
44   tail++;
45   if (tail == d->end) tail = d->bgn;
46   d->tail = tail;
47   if (tail == d->head) CqsDeqExpand(d);
48 }
49
50 void CqsDeqEnqueueLifo(d, data)
51 deq d; void *data;
52 {
53   void **head = d->head;
54   if (head == d->bgn) head = d->end;
55   head--;
56   *head = data;
57   d->head = head;
58   if (head == d->tail) CqsDeqExpand(d);
59 }
60
61 void *CqsDeqDequeue(d)
62 deq d;
63 {
64   void **head;
65   void **tail;
66   void *data;
67   head = d->head;
68   tail = d->tail;
69   if (head == tail) return 0;
70   data = *head;
71   head++;
72   if (head == d->end) head = d->bgn;
73   d->head = head;
74   return data;
75 }
76
77 static void CqsPrioqInit(pq)
78 prioq pq;
79 {
80   int i;
81   pq->heapsize = 100;
82   pq->heapnext = 1;
83   pq->hash_key_size = PRIOQ_TABSIZE;
84   pq->hash_entry_size = 0;
85   pq->heap = (prioqelt *)CmiAlloc(100 * sizeof(prioqelt));
86   pq->hashtab = (prioqelt *)CmiAlloc(pq->hash_key_size * sizeof(prioqelt));
87   for (i=0; i<pq->hash_key_size; i++) pq->hashtab[i]=0;
88 }
89
90 #if CMK_C_INLINE
91 inline
92 #endif
93 static void CqsPrioqExpand(prioq pq)
94 {
95   int oldsize = pq->heapsize;
96   int newsize = oldsize * 2;
97   prioqelt *oheap = pq->heap;
98   prioqelt *nheap = (prioqelt *)CmiAlloc(newsize*sizeof(prioqelt));
99   memcpy(nheap, oheap, oldsize * sizeof(prioqelt));
100   pq->heap = nheap;
101   pq->heapsize = newsize;
102   CmiFree(oheap);
103 }
104 #ifndef FASTQ
105 void CqsPrioqRehash(pq)
106      prioq pq;
107 {
108   int oldHsize = pq->hash_key_size;
109   int newHsize = oldHsize * 2;
110   unsigned int hashval;
111   prioqelt pe, pe1, pe2;
112   int i,j;
113
114   prioqelt *ohashtab = pq->hashtab;
115   prioqelt *nhashtab = (prioqelt *)CmiAlloc(newHsize*sizeof(prioqelt));
116
117   pq->hash_key_size = newHsize;
118
119   for(i=0; i<newHsize; i++)
120     nhashtab[i] = 0;
121
122   for(i=0; i<oldHsize; i++) {
123     for(pe=ohashtab[i]; pe; ) {
124       pe2 = pe->ht_next;
125       hashval = pe->pri.bits;
126       for (j=0; j<pe->pri.ints; j++) hashval ^= pe->pri.data[j];
127       hashval = (hashval&0x7FFFFFFF)%newHsize;
128
129       pe1=nhashtab[hashval];
130       pe->ht_next = pe1;
131       pe->ht_handle = (nhashtab+hashval);
132       if (pe1) pe1->ht_handle = &(pe->ht_next);
133       nhashtab[hashval]=pe;
134       pe = pe2;
135     }
136   }
137   pq->hashtab = nhashtab;
138   pq->hash_key_size = newHsize;
139   CmiFree(ohashtab);
140 }
141 #endif
142 /*
143  * This routine compares priorities. It returns:
144  * 
145  * 1 if prio1 > prio2
146  * ? if prio1 == prio2
147  * 0 if prio1 < prio2
148  *
149  * where prios are treated as unsigned
150  */
151
152 int CqsPrioGT(prio1, prio2)
153 prio prio1;
154 prio prio2;
155 {
156 #ifndef FASTQ
157   unsigned int ints1 = prio1->ints;
158   unsigned int ints2 = prio2->ints;
159 #endif
160   unsigned int *data1 = prio1->data;
161   unsigned int *data2 = prio2->data;
162 #ifndef FASTQ
163   unsigned int val1;
164   unsigned int val2;
165 #endif
166   while (1) {
167 #ifndef FASTQ
168     if (ints1==0) return 0;
169     if (ints2==0) return 1;
170 #else
171     if (prio1->ints==0) return 0;
172     if (prio2->ints==0) return 1;
173 #endif
174 #ifndef FASTQ
175     val1 = *data1++;
176     val2 = *data2++;
177     if (val1 < val2) return 0;
178     if (val1 > val2) return 1;
179     ints1--;
180     ints2--;
181 #else
182     if(*data1++ < *data2++) return 0;
183     if(*data1++ > *data2++) return 1;
184     (prio1->ints)--;
185     (prio2->ints)--;
186 #endif
187   }
188 }
189
190 deq CqsPrioqGetDeq(pq, priobits, priodata)
191 prioq pq;
192 unsigned int priobits, *priodata;
193 {
194   unsigned int prioints = (priobits+CINTBITS-1)/CINTBITS;
195   unsigned int hashval, i;
196   int heappos; 
197   prioqelt *heap, pe, next, parent;
198   prio pri;
199   int mem_cmp_res;
200   unsigned int pri_bits_cmp;
201   static int cnt_nilesh=0;
202
203 #ifdef FASTQ
204   /*  printf("Hi I'm here %d\n",cnt_nilesh++); */
205 #endif
206   /* Scan for priority in hash-table, and return it if present */
207   hashval = priobits;
208   for (i=0; i<prioints; i++) hashval ^= priodata[i];
209   hashval = (hashval&0x7FFFFFFF)%PRIOQ_TABSIZE;
210 #ifndef FASTQ
211   for (pe=pq->hashtab[hashval]; pe; pe=pe->ht_next)
212     if (priobits == pe->pri.bits)
213       if (memcmp(priodata, pe->pri.data, sizeof(int)*prioints)==0)
214         return &(pe->data);
215 #else
216   parent=NULL;
217   for(pe=pq->hashtab[hashval]; pe; )
218   {
219     parent=pe;
220     pri_bits_cmp=pe->pri.bits;
221     mem_cmp_res=memcmp(priodata,pe->pri.data,sizeof(int)*prioints);
222     if(priobits == pri_bits_cmp && mem_cmp_res==0)
223       return &(pe->data);
224     else if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
225     {
226       pe=pe->ht_right;
227     }
228     else 
229     {
230       pe=pe->ht_left;
231     }
232   }
233 #endif
234   
235   /* If not present, allocate a bucket for specified priority */
236   pe = (prioqelt)CmiAlloc(sizeof(struct prioqelt_struct)+((prioints-1)*sizeof(int)));
237   pe->pri.bits = priobits;
238   pe->pri.ints = prioints;
239   memcpy(pe->pri.data, priodata, (prioints*sizeof(int)));
240   CqsDeqInit(&(pe->data));
241   pri=&(pe->pri);
242
243   /* Insert bucket into hash-table */
244   next = pq->hashtab[hashval];
245 #ifndef FASTQ
246   pe->ht_next = next;
247   pe->ht_handle = (pq->hashtab+hashval);
248   if (next) next->ht_handle = &(pe->ht_next);
249   pq->hashtab[hashval] = pe;
250 #else
251   pe->ht_parent = parent;
252   pe->ht_left = NULL;
253   pe->ht_right = NULL;
254   if(priobits > pri_bits_cmp || (priobits == pri_bits_cmp && mem_cmp_res>0))
255   {
256     if(parent) {
257       parent->ht_right = pe;
258       pe->ht_handle = &(parent->ht_right);
259     }
260     else {
261       pe->ht_handle = (pq->hashtab+hashval);
262       pq->hashtab[hashval] = pe;
263     }
264     /*    pe->ht_handle = &(pe); */
265   }
266   else
267   {
268     if(parent) {
269       parent->ht_left = pe;
270       pe->ht_handle = &(parent->ht_left);
271     }
272     else {
273       pe->ht_handle = (pq->hashtab+hashval);
274       pq->hashtab[hashval] = pe;
275     }
276     /*    pe->ht_handle = &(pe); */
277   }
278   if(!next)
279     pq->hashtab[hashval] = pe;
280 #endif
281   pq->hash_entry_size++;
282 #ifndef FASTQ
283   if(pq->hash_entry_size > 2*pq->hash_key_size)
284     CqsPrioqRehash(pq);
285 #endif  
286   /* Insert bucket into heap */
287   heappos = pq->heapnext++;
288   if (heappos == pq->heapsize) CqsPrioqExpand(pq);
289   heap = pq->heap;
290   while (heappos > 1) {
291     int parentpos = (heappos >> 1);
292     prioqelt parent = heap[parentpos];
293     if (CqsPrioGT(pri, &(parent->pri))) break;
294     heap[heappos] = parent; heappos=parentpos;
295   }
296   heap[heappos] = pe;
297
298 #ifdef FASTQ
299   /*  printf("Hi I'm here222\n"); */
300 #endif
301   
302   return &(pe->data);
303 }
304
305 void *CqsPrioqDequeue(pq)
306 prioq pq;
307 {
308   prio pri;
309   prioqelt pe, old; void *data;
310   int heappos, heapnext;
311   prioqelt *heap = pq->heap;
312   int left_child;
313   prioqelt temp1_ht_right, temp1_ht_left, temp1_ht_parent;
314   prioqelt *temp1_ht_handle;
315   static int cnt_nilesh1=0;
316
317 #ifdef FASTQ
318   /*  printf("Hi I'm here too!! %d\n",cnt_nilesh1++); */
319 #endif
320   if (pq->heapnext==1) return 0;
321   pe = heap[1];
322   data = CqsDeqDequeue(&(pe->data));
323   if (pe->data.head == pe->data.tail) {
324     /* Unlink prio-bucket from hash-table */
325 #ifndef FASTQ
326     prioqelt next = pe->ht_next;
327     prioqelt *handle = pe->ht_handle;
328     if (next) next->ht_handle = handle;
329     *handle = next;
330     old=pe;
331 #else
332     old=pe;
333     prioqelt *handle;
334     if(pe->ht_parent)
335     { 
336       if(pe->ht_parent->ht_left==pe) left_child=1;
337       else left_child=0;
338     }
339     else
340       {  /* it is the root in the hashtable entry, so its ht_handle should be used by whoever is the new root */
341       handle = pe->ht_handle;
342     }
343     
344     if(!pe->ht_left && !pe->ht_right)
345     {
346       if(pe->ht_parent) {
347         if(left_child) pe->ht_parent->ht_left=NULL;
348         else pe->ht_parent->ht_right=NULL;
349       }
350       else {
351         *handle = NULL;
352       }
353     }
354     else if(!pe->ht_right)
355     {
356       /*if the node does not have a right subtree, its left subtree root is the new child of its parent */
357       pe->ht_left->ht_parent=pe->ht_parent;
358       if(pe->ht_parent)
359       {
360         if(left_child) {
361           pe->ht_parent->ht_left = pe->ht_left;
362           pe->ht_left->ht_handle = &(pe->ht_parent->ht_left);
363         }
364         else {
365           pe->ht_parent->ht_right = pe->ht_left;
366           pe->ht_left->ht_handle = &(pe->ht_parent->ht_right);
367         }
368       }
369       else {
370         pe->ht_left->ht_handle = handle;
371         *handle = pe->ht_left;
372       }
373     }
374     else if(!pe->ht_left)
375     {
376       /*if the node does not have a left subtree, its right subtree root is the new child of its parent */
377       pe->ht_right->ht_parent=pe->ht_parent;
378       /*pe->ht_right->ht_left=pe->ht_left; */
379       if(pe->ht_parent)
380       {
381         if(left_child) {
382           pe->ht_parent->ht_left = pe->ht_right;
383           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
384         }
385         else {
386           pe->ht_parent->ht_right = pe->ht_right;
387           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
388         }
389       }
390       else {
391         pe->ht_right->ht_handle = handle;
392         *handle = pe->ht_right;
393       }
394     }
395     else if(!pe->ht_right->ht_left)
396     {
397       pe->ht_right->ht_parent=pe->ht_parent;
398       if(pe->ht_parent)
399       {
400         if(left_child) {
401           pe->ht_parent->ht_left = pe->ht_right;
402           pe->ht_right->ht_handle = &(pe->ht_parent->ht_left);
403         }
404         else {
405           pe->ht_parent->ht_right = pe->ht_right;
406           pe->ht_right->ht_handle = &(pe->ht_parent->ht_right);
407         }
408       }
409       else {
410         pe->ht_right->ht_handle = handle;
411         *handle = pe->ht_right;
412       }
413       if(pe->ht_left) {
414         pe->ht_right->ht_left = pe->ht_left;
415         pe->ht_left->ht_parent = pe->ht_right;
416         pe->ht_left->ht_handle = &(pe->ht_right->ht_left);
417       }
418     }
419     else
420     {
421       /*if it has both subtrees, swap it with its successor */
422       for(pe=pe->ht_right; pe; )
423       {
424         if(pe->ht_left) pe=pe->ht_left;
425         else  /*found the sucessor */
426           { /*take care of the connections */
427           if(old->ht_parent)
428           {
429             if(left_child) {
430               old->ht_parent->ht_left = pe;
431               pe->ht_handle = &(old->ht_parent->ht_left);
432             }
433             else {
434               old->ht_parent->ht_right = pe;
435               pe->ht_handle = &(old->ht_parent->ht_right);
436             }
437           }
438           else {
439             pe->ht_handle = handle;
440             *handle = pe;
441           }
442           temp1_ht_right = pe->ht_right;
443           temp1_ht_left = pe->ht_left;
444           temp1_ht_parent = pe->ht_parent;
445           temp1_ht_handle = pe->ht_handle;
446
447           pe->ht_parent = old->ht_parent;
448           pe->ht_left = old->ht_left;
449           pe->ht_right = old->ht_right;
450           if(pe->ht_left) {
451             pe->ht_left->ht_parent = pe;
452             pe->ht_right->ht_handle = &(pe->ht_right);
453           }
454           if(pe->ht_right) {
455             pe->ht_right->ht_parent = pe;
456             pe->ht_right->ht_handle = &(pe->ht_right);
457           }
458           temp1_ht_parent->ht_left = temp1_ht_right;
459           if(temp1_ht_right) {
460             temp1_ht_right->ht_handle = &(temp1_ht_parent->ht_left);
461             temp1_ht_right->ht_parent = temp1_ht_parent;
462           }
463           break;
464         }
465       }
466     }
467 #endif
468     pq->hash_entry_size--;
469     
470     /* Restore the heap */
471     heapnext = (--pq->heapnext);
472     pe = heap[heapnext];
473     pri = &(pe->pri);
474     heappos = 1;
475     while (1) {
476       int childpos1, childpos2, childpos;
477       prioqelt ch1, ch2, child;
478       childpos1 = heappos<<1;
479       if (childpos1>=heapnext) break;
480       childpos2 = childpos1+1;
481       if (childpos2>=heapnext)
482         { childpos=childpos1; child=heap[childpos1]; }
483       else {
484         ch1 = heap[childpos1];
485         ch2 = heap[childpos2];
486         if (CqsPrioGT(&(ch1->pri), &(ch2->pri)))
487              {childpos=childpos2; child=ch2;}
488         else {childpos=childpos1; child=ch1;}
489       }
490       if (CqsPrioGT(&(child->pri), pri)) break;
491       heap[heappos]=child; heappos=childpos;
492     }
493     heap[heappos]=pe;
494     
495     /* Free prio-bucket */
496     if (old->data.bgn != old->data.space) CmiFree(old->data.bgn);
497     CmiFree(old);
498   }
499   return data;
500 }
501
502 Queue CqsCreate(void)
503 {
504   Queue q = (Queue)CmiAlloc(sizeof(struct Queue_struct));
505   q->length = 0;
506   q->maxlen = 0;
507 #ifdef FASTQ
508   /*  printf("\nIN fastq"); */
509 #endif
510   CqsDeqInit(&(q->zeroprio));
511   CqsPrioqInit(&(q->negprioq));
512   CqsPrioqInit(&(q->posprioq));
513   return q;
514 }
515
516 void CqsDelete(Queue q)
517 {
518   CmiFree(q->negprioq.heap);
519   CmiFree(q->posprioq.heap);
520   CmiFree(q);
521 }
522
523 unsigned int CqsLength(Queue q)
524 {
525   return q->length;
526 }
527
528 unsigned int CqsMaxLength(Queue q)
529 {
530   return q->maxlen;
531 }
532
533 int CqsEmpty(Queue q)
534 {
535   return (q->length == 0);
536 }
537
538 void CqsEnqueueGeneral(Queue q, void *data, int strategy, 
539            int priobits,unsigned int *prioptr)
540 {
541   deq d; int iprio;
542   CmiInt8 lprio0, lprio;
543   switch (strategy) {
544   case CQS_QUEUEING_FIFO: 
545     CqsDeqEnqueueFifo(&(q->zeroprio), data); 
546     break;
547   case CQS_QUEUEING_LIFO: 
548     CqsDeqEnqueueLifo(&(q->zeroprio), data); 
549     break;
550   case CQS_QUEUEING_IFIFO:
551     iprio=prioptr[0]+(1U<<(CINTBITS-1));
552     if ((int)iprio<0)
553       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
554     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
555     CqsDeqEnqueueFifo(d, data);
556     break;
557   case CQS_QUEUEING_ILIFO:
558     iprio=prioptr[0]+(1U<<(CINTBITS-1));
559     if ((int)iprio<0)
560       d=CqsPrioqGetDeq(&(q->posprioq), CINTBITS, &iprio);
561     else d=CqsPrioqGetDeq(&(q->negprioq), CINTBITS, &iprio);
562     CqsDeqEnqueueLifo(d, data);
563     break;
564   case CQS_QUEUEING_BFIFO:
565     if (priobits&&(((int)(prioptr[0]))<0))
566        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
567     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
568     CqsDeqEnqueueFifo(d, data);
569     break;
570   case CQS_QUEUEING_BLIFO:
571     if (priobits&&(((int)(prioptr[0]))<0))
572        d=CqsPrioqGetDeq(&(q->posprioq), priobits, prioptr);
573     else d=CqsPrioqGetDeq(&(q->negprioq), priobits, prioptr);
574     CqsDeqEnqueueLifo(d, data);
575     break;
576
577   case CQS_QUEUEING_LFIFO:     
578     CmiAssert(priobits == CLONGBITS);
579     lprio0 =((CmiUInt8 *)prioptr)[0];
580     lprio0 += (1ULL<<(CLONGBITS-1));
581     if (CmiEndianness() == 0) {           /* little-endian */
582       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
583     }
584     else {                /* little-endian */
585       lprio = lprio0;
586     }
587     if (lprio0<0)
588         d=CqsPrioqGetDeq(&(q->posprioq), priobits, &lprio);
589     else
590         d=CqsPrioqGetDeq(&(q->negprioq), priobits, &lprio);
591     CqsDeqEnqueueFifo(d, data);
592     break;
593   case CQS_QUEUEING_LLIFO:
594     lprio0 =((CmiUInt8 *)prioptr)[0];
595     lprio0 += (1ULL<<(CLONGBITS-1));
596     if (CmiEndianness() == 0) {           /* little-endian happen to compare least significant part first */
597       lprio =(((CmiUInt4 *)&lprio0)[0]*1LL)<<CINTBITS | ((CmiUInt4 *)&lprio0)[1];
598     }
599     else {                /* little-endian */
600       lprio = lprio0;
601     }
602     if (lprio0<0)
603         d=CqsPrioqGetDeq(&(q->posprioq), priobits, &lprio);
604     else
605         d=CqsPrioqGetDeq(&(q->negprioq), priobits, &lprio);
606     CqsDeqEnqueueLifo(d, data);
607     break;
608   default:
609     CmiAbort("CqsEnqueueGeneral: invalid queueing strategy.\n");
610   }
611   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
612 }
613
614 void CqsEnqueueFifo(Queue q, void *data)
615 {
616   CqsDeqEnqueueFifo(&(q->zeroprio), data);
617   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
618 }
619
620 void CqsEnqueueLifo(Queue q, void *data)
621 {
622   CqsDeqEnqueueLifo(&(q->zeroprio), data);
623   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
624 }
625
626 void CqsEnqueue(Queue q, void *data)
627 {
628   CqsDeqEnqueueFifo(&(q->zeroprio), data);
629   q->length++; if (q->length>q->maxlen) q->maxlen=q->length;
630 }
631
632 void CqsDequeue(Queue q, void **resp)
633 {
634   if (q->length==0) 
635     { *resp = 0; return; }
636   if (q->negprioq.heapnext>1)
637     { *resp = CqsPrioqDequeue(&(q->negprioq)); q->length--; return; }
638   if (q->zeroprio.head != q->zeroprio.tail)
639     { *resp = CqsDeqDequeue(&(q->zeroprio)); q->length--; return; }
640   if (q->posprioq.heapnext>1)
641     { *resp = CqsPrioqDequeue(&(q->posprioq)); q->length--; return; }
642   *resp = 0; return;
643 }
644
645 static struct prio_struct kprio_zero = { 0, 0, {0} };
646 static struct prio_struct kprio_max  = { 32, 1, {((unsigned int)(-1))} };
647
648 prio CqsGetPriority(q)
649 Queue q;
650 {
651   if (q->negprioq.heapnext>1) return &(q->negprioq.heap[1]->pri);
652   if (q->zeroprio.head != q->zeroprio.tail) { return &kprio_zero; }
653   if (q->posprioq.heapnext>1) return &(q->posprioq.heap[1]->pri);
654   return &kprio_max;
655 }
656
657 prio CqsGetSecondPriority(q)
658 Queue q;
659 {
660   return CqsGetPriority(q);
661 }
662
663 void** CqsEnumerateDeq(deq q, int *num){
664   void **head, **tail;
665   void **result;
666   int count = 0;
667   int i;
668
669   head = q->head;
670   tail = q->tail;
671
672   while(head != tail){
673     count++;
674     head++;
675     if(head == q->end)
676       head = q->bgn;
677   }
678
679   result = (void **)CmiAlloc(count * sizeof(void *));
680   i = 0;
681   head = q->head;
682   tail = q->tail;
683   while(head != tail){
684     result[i] = *head;
685     i++;
686     head++;
687     if(head == q->end)
688       head = q->bgn;
689   }
690   *num = count;
691   return(result);
692 }
693
694 void** CqsEnumeratePrioq(prioq q, int *num){
695   void **head, **tail;
696   void **result;
697   int i,j;
698   int count = 0;
699   prioqelt pe;
700
701   for(i = 1; i < q->heapnext; i++){
702     pe = (q->heap)[i];
703     head = pe->data.head;
704     tail = pe->data.tail;
705     while(head != tail){
706       count++;
707       head++;
708       if(head == (pe->data).end)
709         head = (pe->data).bgn;
710     }
711   }
712
713   result = (void **)CmiAlloc(count * sizeof(void *));
714   *num = count;
715   
716   j = 0;
717   for(i = 1; i < q->heapnext; i++){
718     pe = (q->heap)[i];
719     head = pe->data.head;
720     tail = pe->data.tail;
721     while(head != tail){
722       result[j] = *head;
723       j++;
724       head++;
725       if(head ==(pe->data).end)
726         head = (pe->data).bgn; 
727     }
728   }
729
730   return result;
731 }
732
733 void CqsEnumerateQueue(Queue q, void ***resp){
734   void **result;
735   int num;
736   int i,j;
737
738   *resp = (void **)CmiAlloc(q->length * sizeof(void *));
739   j = 0;
740
741   result = CqsEnumeratePrioq(&(q->negprioq), &num);
742   for(i = 0; i < num; i++){
743     (*resp)[j] = result[i];
744     j++;
745   }
746   CmiFree(result);
747   
748   result = CqsEnumerateDeq(&(q->zeroprio), &num);
749   for(i = 0; i < num; i++){
750     (*resp)[j] = result[i];
751     j++;
752   }
753   CmiFree(result);
754
755   result = CqsEnumeratePrioq(&(q->posprioq), &num);
756   for(i = 0; i < num; i++){
757     (*resp)[j] = result[i];
758     j++;
759   }
760   CmiFree(result);
761 }
762