doc: Add serial to list of ci file reserved words
[charm.git] / src / conv-core / cpath.c
1 #include <converse.h>
2 #include <stdarg.h>
3
4 /******************************************************************************
5  *
6  * Data definitions and globals
7  *
8  *****************************************************************************/
9
10 #define  CrcGenericAlign 8
11 #define  CrcGenericAlignInt(n) (((n)+CrcGenericAlign-1)&(~(CrcGenericAlign-1)))
12 #define  CrcGenericAlignPtr(p) ((void*)CrcGenericAlignInt((size_t)(p)))
13
14 typedef struct buffer buffer;
15 typedef struct eltset    *eltset;
16 typedef struct single    *single;
17 typedef struct reduction *reduction;
18
19 struct buffer
20 {
21   unsigned char *data;
22   int            fill;
23   int            size;
24 };
25
26 struct eltset
27 {
28   eltset next;
29   CPath path;
30   int eltno;
31   int root;   /* who is the root of the set */
32   int inset;  /* True if I am in the set */
33   int parent; /* my parent in the set */
34   int child1; /* my first child in the set */
35   int child2; /* my second child in the set */
36   reduction reductions; /* list of reductions in progress over this set */
37   int nlocals;
38   single locals[1];
39 };
40
41 struct single
42 {
43   single next;
44   CPath path;
45   int eltno;
46   int waiting;
47   CmmTable mm;
48   CthThread thread;
49 };
50
51 struct reduction
52 {
53   eltset over;
54   CPath dest; int desteltno;
55   int reducer;
56   int vecsize; int eltsize;
57   char *msg_begin, *msg_data, *msg_end;
58   int anydata, waitcount;
59   struct reduction *next;
60   int ntags;
61   int tags[1];
62 };
63
64 CpvStaticDeclare(int *,  PElist);
65 CpvStaticDeclare(char *, PEflags);
66 CpvStaticDeclare(int, seqno);
67 CpvStaticDeclare(int, CPathSendIdx);
68 CpvStaticDeclare(int, CPathReduceIdx);
69 CpvStaticDeclare(eltset *, EltsetTable);
70 CpvStaticDeclare(int, EltsetTabsize);
71 CpvStaticDeclare(single *, SingleTable);
72 CpvStaticDeclare(int, SingleTabsize);
73 CpvStaticDeclare(reduction *, ReduceTable);
74 CpvStaticDeclare(int, ReduceTabsize);
75 CtvStaticDeclare(single, thisthread);
76
77 /******************************************************************************
78  *
79  * bufalloc
80  *
81  * Given a pointer to a buffer, allocates N aligned bytes at the end
82  * of the buffer.  Resizes the buffer if necessary.
83  *
84  *****************************************************************************/
85
86 static void *bufalloc(buffer *b, int n)
87 {
88   int fill = CrcGenericAlignInt(b->fill);
89   int nfill = fill+n;
90   int nsize; char *ndata;
91
92   b->fill = nfill;
93   if (nfill > b->size) {
94     nsize = nfill*2;
95     b->size = nsize;
96     ndata = (char*)CmiAlloc(nsize);
97     memcpy(ndata, b->data, fill);
98     CmiFree(b->data);
99     b->data = ndata;
100   }
101   return b->data + fill;
102 }
103
104 /******************************************************************************
105  *
106  * CPathIndicesToEltno
107  * CPathEltnoToIndices
108  *
109  * Converts an index-set to a more compact representation and back again.
110  *
111  *****************************************************************************/
112
113 unsigned int CPathIndicesToEltno(int nsizes, int *sizes, int *indices)
114 {
115   unsigned int size, eltno; int i, index;
116   eltno=0;
117   for (i=0; i<nsizes; i++) {
118     size = sizes[i];
119     index = indices[i];
120     eltno *= (size + 1);
121     if (index == CPATH_ALL) index = size;
122     eltno += index;
123   }
124   return eltno;
125 }
126
127 void CPathEltnoToIndices(int nsizes, int *sizes,
128                          unsigned int eltno, int *indices)
129 {
130   unsigned int size; int i, index;
131   for (i=nsizes-1; i>=0; i--) {
132     size = sizes[i];
133     index = eltno % (size + 1);
134     eltno /= (size + 1);
135     if (index == size) index = CPATH_ALL;
136     indices[i] = index;
137   }
138 }
139
140 /******************************************************************************
141  *
142  * CPathGetEltset
143  *
144  * Given an element set specified as a path and set of indices (possibly
145  * with wildcards), returns the information about that set of elements.
146  *
147  * Note to reimplementors: this function has a lot of local variables.
148  * It just goes down the list and fills them in one by one, top to bottom.
149  *
150  *****************************************************************************/
151
152 int CPathMap(CPath *path, int *indices)
153 {
154   CPathMapFn mapper = (CPathMapFn)CmiHandlerToFunction(path->mapfn);
155   return mapper(path, indices) % CmiNumPes();
156 }
157
158 int CPathRoot(CPath *path, int *indices)
159 {
160   int nindices[13], i;
161   for (i=0; i<path->nsizes; i++) {
162     nindices[i] = indices[i];
163     if (nindices[i] == CPATH_ALL)
164       nindices[i] = path->sizes[i] - 1;
165   }
166   return CPathMap(path, nindices);
167 }
168
169 void CPathExecute(single t)
170 {
171   unsigned int indices[13];
172   CthVoidFn fn = CmiHandlerToFunction(t->path.startfn);
173   CtvAccess(thisthread) = t;
174   CPathEltnoToIndices(t->path.nsizes, t->path.sizes, t->eltno, indices);
175   fn(&(t->path), indices);
176   /* Insert garbage-collection code here */
177 }
178
179 single CPathGetSingle(CPath *path, int *indices)
180 {
181   single *table = CpvAccess(SingleTable);
182   unsigned int eltno = CPathIndicesToEltno(path->nsizes, path->sizes, indices);
183   unsigned int hashval = path->seqno ^ path->creator ^ eltno;
184   unsigned int bucket = hashval % CpvAccess(SingleTabsize);
185   single result;
186
187   for (result=table[bucket]; result; result=result->next) {
188     if ((result->path.seqno == path->seqno)&&
189         (result->path.creator == path->creator)&&
190         (result->eltno == eltno))
191       return result;
192   }
193   
194   result = (single)malloc(sizeof(struct single));
195   _MEMCHECK(result);
196   result->path = *path;
197   result->eltno = eltno;
198   result->waiting = 0;
199   result->mm = CmmNew();
200   result->thread = CthCreate(CPathExecute, result, 0);
201   result->next = table[bucket];
202   table[bucket] = result;
203   CthAwaken(result->thread);
204   return result;
205 }
206
207 eltset CPathAllocEltset(eltset old, int nlocals)
208 {
209   return (eltset)realloc(old, sizeof(struct eltset) + nlocals*sizeof(single));
210 }
211
212 void CPathDecrementOdometer(int *odometer, int *sizes, int nvary, int *vary)
213 {
214   int i = 0;
215   while (odometer[vary[i]]==0) {
216     odometer[vary[i]] = sizes[vary[i]] - 1;
217     i++;
218   }
219   odometer[vary[i]]--;
220 }
221
222 eltset CPathGetEltset(CPath *path, int *indices)
223 {
224   int           nsizes;      /* extracted from path */
225   int          *sizes;       /* extracted from path */
226   unsigned int  eltno;       /* element number associated with indices */
227   eltset       *table;       /* hash table containing eltsets */
228   unsigned int  hashval;     /* hash value of this eltset */
229   unsigned int  bucket;      /* hash table bucket holding this eltset */
230   int           nelts;       /* total size of eltset (not just local) */
231   int           nvary;       /* number of indices equal to CPATH_ALL */
232   int           vary[13];    /* list of indices equal to CPATH_ALL */
233   int           mype;        /* my processor number */
234   int           root;        /* processor holding first elt of set */
235   char         *peflags;     /* set of PEs containing elts */
236   int          *pelist;      /* list of PEs containing elts */
237   int           pecount;     /* count of PEs containing elts */
238   int           peself;      /* My position in the PE list */
239   eltset        result;      /* result buffer */
240   int           resultfill;  /* number of locals filled in result buffer */
241   int           resultsize;  /* number of locals available in result buffer */
242   int tsize, tpe, teltno, i, j;  /* temporary vars */
243
244   /* Compute nsizes, sizes */
245   nsizes = path->nsizes;
246   sizes  = path->sizes;
247   
248   /* Compute eltno, hashval and bucket, table */
249   eltno = CPathIndicesToEltno(nsizes, sizes, indices);
250   hashval = path->seqno ^ path->creator ^ eltno;
251   bucket = hashval % CpvAccess(EltsetTabsize);
252   table = CpvAccess(EltsetTable);
253   
254   /* Scan for eltset in hash table.  If present, return it. */
255   for (result=table[bucket]; result; result=result->next) {
256     if ((result->eltno == eltno) &&
257         (result->path.seqno == path->seqno) &&
258         (result->path.creator == path->creator)) {
259       return result;
260     }
261   }
262   
263   /* Compute nelts, vary, nvary. Initialize indices to starting values. */
264   nelts = 1; nvary = 0;
265   for (i=0; i<nsizes; i++) {
266     if (indices[i] == CPATH_ALL) {
267       tsize = sizes[i];
268       nelts *= tsize;
269       indices[i] = tsize - 1;
270       vary[nvary++] = i;
271     }
272   }
273   
274   /* Compute root, mype */
275   root = CPathMap(path, indices);
276   mype = CmiMyPe();
277   
278   /* put nelts==1 optimization here */
279   
280   /* Initialize the buffers */
281   pelist  = CpvAccess(PElist);
282   peflags = CpvAccess(PEflags);
283   pecount = 0;
284   peself  = -1;
285   resultfill = 0;
286   resultsize = (nelts > 10) ? 10 : nelts;
287   result = CPathAllocEltset(0, resultsize);
288   
289   /* Gen all elts. Compute peflags, pelist, pecount, peself, locals */
290   i = 0;
291   while (1) {
292     tpe = CPathMap(path, indices);
293     if (peflags[tpe]==0) {
294       if (tpe == mype) peself = pecount;
295       peflags[tpe] = 1;
296       pelist[pecount++] = tpe;
297     }
298     if (tpe == mype) {
299       if (resultfill == resultsize) {
300         resultsize <<= 1;
301         result = CPathAllocEltset(result, resultsize);
302       }
303       result->locals[resultfill++] = CPathGetSingle(path, indices);
304     }
305     i++; if (i==nelts) break;
306     CPathDecrementOdometer(indices, sizes, nvary, vary);
307   }
308   
309   /* clear the PEflags for later use, reset indices */
310   for (i=0; i<pecount; i++) peflags[pelist[i]] = 0;
311   for (i=0; i<nvary; i++) indices[vary[i]] = CPATH_ALL;
312   
313   /* fill in fields of result structure */
314   result = CPathAllocEltset(result, resultfill);
315   result->next = table[bucket];
316   table[bucket] = result;
317   result->path = (*path);
318   result->eltno = eltno;
319   result->root = root;
320   if (peself < 0) {
321     result->inset = 0;
322     result->parent = -1;
323     result->child1 = -1;
324     result->child2 = -1;
325   } else {
326     i = (peself+1)<<1;
327     j = i-1;
328     result->inset = 1;
329     result->parent = peself ? pelist[(peself-1)>>1] : -1;
330     result->child1 = (i<pecount) ? pelist[i] : -1;
331     result->child2 = (j<pecount) ? pelist[j] : -1;
332   }
333   result->nlocals = resultfill;
334   return result;
335 }
336
337 /******************************************************************************
338  *
339  * CPathSend
340  *
341  * Send a message to a thread or set of threads.  If you send to a set
342  * of elements, finds the root of the spanning tree and sends to that
343  * element.
344  *
345  *****************************************************************************/
346
347 void CPathSend(int key, ...)
348 {
349   buffer buf; va_list p; CPath *path;
350   char *src, *cpy; int root, i, vecsize, eltsize, size, idx, ntags, eltno;
351   int indices[13], tags[256], *tagv;
352   
353   buf.data = (unsigned char*)CmiAlloc(128);
354   buf.fill = CmiMsgHeaderSizeBytes;
355   buf.size = 128;
356   va_start(p, key);
357   
358   /* Insert path and indices into message */
359   switch(key) {
360   case CPATH_DEST:
361     path = va_arg(p, CPath *);
362     for (i=0; i<path->nsizes; i++)
363       indices[i] = va_arg(p, int);
364     break;
365   case CPATH_DESTELT:
366     path = va_arg(p, CPath *);
367     eltno = va_arg(p, int);
368     CPathEltnoToIndices(path->nsizes, path->sizes, eltno, indices);
369     break;
370   default: goto synerr;
371   }
372   cpy = bufalloc(&buf,sizeof(CPath));
373   memcpy(cpy, path, sizeof(CPath));
374   cpy = bufalloc(&buf,path->nsizes * sizeof(int));
375   memcpy(cpy, indices, path->nsizes * sizeof(int));
376   root = CPathRoot(path, indices);
377   
378   /* Insert tags into message */
379   key = va_arg(p, int);
380   switch (key) {
381   case CPATH_TAG:
382     ntags = 1;
383     tags[0] = va_arg(p, int);
384     tagv = tags;
385     break;
386   case CPATH_TAGS:
387     ntags = va_arg(p, int);
388     for (i=0; i<ntags; i++)
389       tags[i] = va_arg(p, int);
390     tagv = tags;
391     break;
392   case CPATH_TAGVEC:
393     ntags = va_arg(p, int);
394     tagv = va_arg(p, int*);
395     break;
396   }
397   cpy = bufalloc(&buf,(ntags+1)*sizeof(int));
398   ((int*)cpy)[0] = ntags;
399   for (i=0; i<ntags; i++)
400     ((int*)cpy)[i+1] = tagv[i];
401   
402   /* Insert data into message */
403   key = va_arg(p, int);
404   switch (key) {
405   case CPATH_BYTES:
406     size = va_arg(p, int);
407     src = va_arg(p, char*);
408     cpy = bufalloc(&buf,sizeof(int)+size);
409     *((int*)cpy) = size;
410     memcpy(cpy + sizeof(int), src, size);
411     break;
412   case CPATH_REDBYTES:
413     vecsize = va_arg(p, int);
414     eltsize = va_arg(p, int);
415     size = vecsize * eltsize;
416     src = va_arg(p, char *);
417     cpy = bufalloc(&buf,2*sizeof(int)+(vecsize*eltsize));
418     ((int*)cpy)[0] = size;
419     ((int*)cpy)[1] = vecsize;
420     memcpy(cpy + sizeof(int) + sizeof(int), src, size);
421     break;
422   default: goto synerr;
423   }
424   key = va_arg(p, int);
425   if (key!=CPATH_END) goto synerr;
426
427   /* send message to root of eltset */
428   CmiSetHandler(buf.data, CpvAccess(CPathSendIdx));
429   CmiSyncSendAndFree(root, buf.fill, buf.data);
430   return;
431   
432 synerr:
433   CmiError("CPathSend: syntax error in argument list.\n");
434   exit(1);
435 }
436
437 void CPathSendHandler(char *decoded)
438 {
439   char *p; CPath *path;
440   int *indices, *tags, ntags, mype, i, *tail, len;
441   eltset set; single sing; char *mend;
442
443   mype = CmiMyPe();
444   path = (CPath*)CrcGenericAlignPtr(decoded + CmiMsgHeaderSizeBytes);
445   indices = (int*)CrcGenericAlignPtr(path + 1);
446   tags = (int*)CrcGenericAlignPtr(indices + path->nsizes);
447   ntags = *tags++;
448   tail = (int*)CrcGenericAlignPtr(tags + ntags);
449   len = *tail++;
450   mend = ((char*)tail)+len;
451   /* put optimization here: check for a send which is just to me */
452   set = CPathGetEltset(path, indices);
453   if (set->child1 >= 0) CmiSyncSend(set->child1, mend-decoded, decoded);
454   if (set->child2 >= 0) CmiSyncSend(set->child2, mend-decoded, decoded);
455   /* this next statement is putting a refcount in the converse core */
456   *((int*)decoded) = set->nlocals;
457   for (i=0; i<set->nlocals; i++) {
458     sing = set->locals[i];
459     CmmPut(sing->mm, ntags, tags, decoded);
460     if (sing->waiting) {
461       sing->waiting = 0;
462       CthAwaken(sing->thread);
463     }
464   }
465 }
466
467 void *CPathRecv(int key, ...)
468 {
469   va_list p; int i, ntags, *tags; int tagbuf[256], rtags[256];
470   int *hlen; char **buffer; void *msg;
471   single t = CtvAccess(thisthread);
472   
473   va_start(p, key);
474   switch(key) {
475   case CPATH_TAG:
476     ntags = 1;
477     tagbuf[0] = va_arg(p, int);
478     tags = tagbuf;
479     break;
480   case CPATH_TAGS:
481     ntags = va_arg(p, int);
482     if (ntags>256) goto sizerr;
483     for (i=0; i<ntags; i++)
484       tagbuf[i] = va_arg(p, int);
485     tags = tagbuf;
486     break;
487   default: goto synerr;
488   }
489   
490   key = va_arg(p, int);
491   if (key != CPATH_END) goto synerr;
492   /* do something about the return tags someday */
493   while (1) {
494     msg = CmmGet(t->mm, ntags, tags, rtags);
495     if (msg) break;
496     t->waiting = 1;
497     CthSuspend();
498   }
499   return msg;
500   
501 synerr:
502   CmiError("CPathRecv: syntax error in argument list.\n");
503   exit(1);
504 sizerr:
505   CmiError("CPathRecv: too many tags.\n");
506   exit(1);
507 }
508
509 void CPathMsgDecodeBytes(void *msg, int *len, void *bytes)
510 {
511   CPath *path; int *indices; unsigned int *tags, ntags, *tail;
512   path = (CPath*)CrcGenericAlignPtr(((char*)msg)+CmiMsgHeaderSizeBytes);
513   indices = (int*)CrcGenericAlignPtr(path + 1);
514   tags = CrcGenericAlignPtr(indices + path->nsizes);
515   ntags = *tags++;
516   tail = CrcGenericAlignPtr(tags + ntags);
517   *len = *tail++;
518   *((void**)bytes) = (void*)tail;
519 }
520
521 void CPathMsgDecodeReduction(void *msg,int *vecsize,int *eltsize,void *bytes)
522 {
523   CPath *path; int *indices; unsigned int *tags, ntags, *tail, size;
524   path = (CPath*)CrcGenericAlignPtr(((char*)msg)+CmiMsgHeaderSizeBytes);
525   indices = (int*)CrcGenericAlignPtr(path + 1);
526   tags = CrcGenericAlignPtr(indices + path->nsizes);
527   ntags = *tags++;
528   tail = CrcGenericAlignPtr(tags + ntags);
529   size = *tail++;
530   *vecsize = *tail++;
531   *eltsize = size / (*vecsize);
532   *((void**)bytes) = (void*)tail;
533 }
534
535 void CPathMsgFree(void *msg)
536 {
537   int ref = ((int*)msg)[0] - 1;
538   if (ref==0) {
539     CmiFree(msg);
540   } else {
541     ((int*)msg)[0] = ref;
542   }
543 }
544
545 void CPathMakeArray(CPath *path, int startfn, int mapfn, ...)
546 {
547   int size, seq, pe, nsizes; va_list p;
548   va_start(p, mapfn);
549   nsizes = 0;
550   while (1) {
551     size = va_arg(p, int);
552     if (size==0) break;
553     if (nsizes==13) {
554       CmiError("CPathMakeArray: Limit of 13 dimensions.\n");
555       exit(1);
556     }
557     path->sizes[nsizes++] = size;
558   }
559   path->creator = CmiMyPe();
560   path->seqno = CpvAccess(seqno)++;
561   path->mapfn = mapfn;
562   path->startfn = startfn;
563   path->nsizes = nsizes;
564 }
565
566 /******************************************************************************
567  *
568  * CPathReduce(CPATH_OVER, array, i, j, k,
569  *             CPATH_TAGS, ...
570  *             CPATH_REDUCER, fn,
571  *             CPATH_BYTES, vecsize, eltsize, data,
572  *             CPATH_DEST, array, i, j, k,
573  *             CPATH_END);
574  *
575  *****************************************************************************/
576
577 void CPathMergeReduction(reduction red, void *data);
578
579 void CPathReduceMismatch()
580 {
581   CmiError("CPathReduce: all members of reduction do not agree on reduction parameters.\n");
582   exit(1);
583 }
584
585 void CPathCreateRedmsg(reduction red)
586 {
587   /* create a reduction message, leaving the data area */
588   /* uninitialized. format of reduction message is: */
589   /* over&dest,overeltno&desteltno&reducer&vecsize&eltsize&ntags,tags,data*/
590   
591   int o_paths, o_params, o_tags, o_data, o_end;
592   int i, *t; char *msg;
593   
594   o_paths = CrcGenericAlignInt(CmiMsgHeaderSizeBytes);
595   o_params = CrcGenericAlignInt(o_paths + 2*sizeof(CPath));
596   o_tags = CrcGenericAlignInt(o_params + 6*sizeof(int));
597   o_data = CrcGenericAlignInt(o_tags + red->ntags*sizeof(int));
598   o_end = o_data + (red->vecsize * red->eltsize);
599   
600   msg = (char*)CmiAlloc(o_end);
601   CmiSetHandler(msg, CpvAccess(CPathReduceIdx));
602   ((CPath*)(msg+o_paths))[0] = red->over->path;
603   ((CPath*)(msg+o_paths))[1] = red->dest;
604   ((int*)(msg+o_params))[0] = red->over->eltno;
605   ((int*)(msg+o_params))[1] = red->desteltno;
606   ((int*)(msg+o_params))[2] = red->reducer;
607   ((int*)(msg+o_params))[3] = red->vecsize;
608   ((int*)(msg+o_params))[4] = red->eltsize;
609   ((int*)(msg+o_params))[5] = red->ntags;
610   t = (int*)(msg+o_tags);
611   for (i=0; i<red->ntags; i++) *t++ = red->tags[i];
612   
613   red->msg_begin = msg;
614   red->msg_data = msg + o_data;
615   red->msg_end = msg + o_end;
616 }
617
618 reduction CPathGetReduction(eltset set, int ntags, int *tags,
619                             int vecsize, int eltsize,
620                             int reducer, CPath *dest, int desteltno)
621 {
622   reduction red; int i;
623   
624   for (red=set->reductions; red; red=red->next) {
625     if (red->ntags != ntags) continue;
626     for (i=0; i<ntags; i++)
627       if (red->tags[i] != tags[i]) continue;
628     
629     if (red->vecsize != vecsize) CPathReduceMismatch();
630     if (red->eltsize != eltsize) CPathReduceMismatch();
631     if (red->reducer != reducer) CPathReduceMismatch();
632     if (red->dest.creator != dest->creator) CPathReduceMismatch();
633     if (red->dest.seqno   != dest->seqno) CPathReduceMismatch();
634     if (red->desteltno    != desteltno) CPathReduceMismatch();
635     
636     return red;
637   }
638   
639   red = (reduction)malloc(sizeof(struct reduction) + ntags*sizeof(int));
640   _MEMCHECK(red);
641   
642   red->over = set;
643   red->ntags = ntags;
644   for (i=0; i<ntags; i++) red->tags[i] = tags[i];
645   red->vecsize = vecsize;
646   red->eltsize = eltsize;
647   red->reducer = reducer;
648   red->dest = (*dest);
649   red->desteltno = desteltno;
650   
651   red->anydata = 0;
652   red->waitcount = set->nlocals;
653   if (set->child1 >= 0) red->waitcount++;
654   if (set->child2 >= 0) red->waitcount++;
655
656   CPathCreateRedmsg(red);
657
658   red->next = set->reductions;
659   set->reductions = red;
660
661   return red;
662 }
663
664 void CPathReduceHandler(void *decoded)
665 {
666   /* over&dest,overeltno&desteltno&reducer&vecsize&eltsize&ntags,tags,data*/
667   CPath *paths, *over, *dest; int *params, *tags;
668   int o_paths, o_params, o_tags, o_data, o_end;
669   int overeltno, desteltno, reducer, vecsize, eltsize, ntags;
670   eltset set; reduction red; int overidx[13]; void *data;
671   
672   paths = (CPath*)CrcGenericAlignPtr(decoded + CmiMsgHeaderSizeBytes);
673   params = (int*)CrcGenericAlignPtr(paths + 2);
674   over = paths+0;
675   dest = paths+1;
676   overeltno = params[0];
677   desteltno = params[1];
678   reducer   = params[2];
679   vecsize   = params[3];
680   eltsize   = params[4];
681   ntags     = params[5];
682   tags = (int*)CrcGenericAlignPtr(params+6);
683   data = (void*)CrcGenericAlignPtr(tags + ntags);
684   
685   CPathEltnoToIndices(over->nsizes, over->sizes, overeltno, overidx);
686   set = CPathGetEltset(over, overidx);
687   red = CPathGetReduction
688     (set, ntags, tags, vecsize, eltsize, reducer, dest, desteltno);
689   CPathMergeReduction(red, data);
690 }
691
692 void CPathForwardReduction(reduction red)
693 {
694   int pe; eltset set; reduction *hred;
695   
696   pe = red->over->parent;
697   if (pe >= 0) {
698     CmiSyncSendAndFree(pe, (red->msg_end) - (red->msg_begin), red->msg_begin);
699   } else {
700     CPathSend(CPATH_DESTELT, &(red->dest), red->desteltno,
701               CPATH_TAGVEC, red->ntags, red->tags,
702               CPATH_REDBYTES, red->vecsize, red->eltsize, red->msg_data,
703               CPATH_END);
704     CmiFree(red->msg_begin);
705   }
706   /* free the reduction */
707   set = red->over;
708   hred = &(set->reductions);
709   while (*hred) {
710     if (*hred == red) *hred = red->next;
711     else hred = &((*hred)->next);
712   }
713   free(red);
714 }
715
716 void CPathMergeReduction(reduction red, void *data)
717 {
718   if (red->anydata) {
719     CmiHandlerToFunction(red->reducer)(red->vecsize, red->msg_data, data);
720   } else {
721     memcpy(red->msg_data, data, red->vecsize * red->eltsize);
722   }
723   red->anydata = 1;
724   red->waitcount--;
725   if (red->waitcount==0) CPathForwardReduction(red);
726 }
727
728 void CPathReduce(int key, ...)
729 {
730   CPath *over; int overidx[13]; 
731   CPath *dest; int destidx[13];
732   int desteltno;
733   int ntags, tags[256];
734   int reducer;
735   int vecsize, eltsize; void *data;
736   va_list p; int i;
737   eltset set; reduction red;
738
739
740   va_start(p, key);
741   if (key != CPATH_OVER) goto synerr;
742   over = va_arg(p, CPath *);
743   for (i=0; i<over->nsizes; i++)
744     overidx[i] = va_arg(p, int);
745   
746   key = va_arg(p, int);
747   switch(key) {
748   case CPATH_TAG:
749     ntags = 1;
750     tags[0] = va_arg(p, int);
751     break;
752   case CPATH_TAGS:
753     ntags = va_arg(p, int);
754     if (ntags > 256) goto synerr;
755     for (i=0; i<ntags; i++)
756       tags[i] = va_arg(p, int);
757     break;
758   default: goto synerr;
759   }
760   
761   key = va_arg(p, int);
762   if (key != CPATH_REDUCER) goto synerr;
763   reducer = va_arg(p,int);
764   
765   key = va_arg(p, int);
766   if (key != CPATH_DEST) goto synerr;
767   dest = va_arg(p, CPath *);
768   for (i=0; i<dest->nsizes; i++)
769     destidx[i] = va_arg(p,int);
770   desteltno = CPathIndicesToEltno(dest->nsizes, dest->sizes, destidx);
771   
772   key = va_arg(p, int);
773   if (key != CPATH_BYTES) goto synerr;
774   vecsize = va_arg(p, int);
775   eltsize = va_arg(p, int);
776   data = va_arg(p, void *);
777   
778   key = va_arg(p, int);
779   if (key != CPATH_END) goto synerr;
780
781   set = CPathGetEltset(over, overidx);
782   red = CPathGetReduction
783     (set, ntags, tags, vecsize, eltsize, reducer, dest, desteltno);
784   CPathMergeReduction(red, data);
785   return;
786   
787 synerr:
788   CmiError("CPathReduce: arglist must have these clauses: OVER, TAGS, REDUCER, DEST, BYTES, END (in that order).\n");
789   exit(1);
790 }
791
792 void CPathModuleInit()
793 {
794   CpvInitialize(int, seqno);
795   CpvInitialize(int, CPathSendIdx);
796   CpvInitialize(int, CPathReduceIdx);
797   CpvInitialize(eltset *, EltsetTable);
798   CpvInitialize(int, EltsetTabsize);
799   CpvInitialize(single *, SingleTable);
800   CpvInitialize(int, SingleTabsize);
801   CpvInitialize(reduction *, ReduceTable);
802   CpvInitialize(int, ReduceTabsize);
803   CtvInitialize(single, thisthread);
804   CpvInitialize(char *, PEflags);
805   CpvInitialize(int *, PElist);
806   
807   CpvAccess(seqno) = 0;
808   CpvAccess(CPathSendIdx) = CmiRegisterHandler(CPathSendHandler);
809   CpvAccess(CPathReduceIdx) = CmiRegisterHandler(CPathReduceHandler);
810   CpvAccess(EltsetTabsize) = 1091;
811   CpvAccess(EltsetTable) = (eltset*)calloc(1091,sizeof(eltset));
812   CpvAccess(SingleTabsize) = 1091;
813   CpvAccess(SingleTable) = (single*)calloc(1091,sizeof(single));
814   CpvAccess(ReduceTabsize) = 1091;
815   CpvAccess(ReduceTable) = (reduction*)calloc(1091,sizeof(reduction));
816   
817   CpvAccess(PEflags) = (char*)calloc(1,CmiNumPes());
818   CpvAccess(PElist) = (int*)malloc(CmiNumPes()*sizeof(int));
819   _MEMCHECK(CpvAccess(PElist));
820 }